diff options
author | Lars Wirzenius <liw@liw.fi> | 2021-08-18 16:27:43 +0000 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2021-08-18 16:27:43 +0000 |
commit | 78d79ca1ca0e4f3b9e9f95f0d9e2cd9891ef95f3 (patch) | |
tree | e71c58b56b3e9a04029d95c3ef5fc398dc6f464e /src | |
parent | 22a4e91448b40477a8247adc7834fa62f02725c6 (diff) | |
parent | 6bc136082f5fc02f4f2ebdcc0facbf666ee7b01c (diff) | |
download | obnam2-78d79ca1ca0e4f3b9e9f95f0d9e2cd9891ef95f3.tar.gz |
Merge branch 'refactor_upload_chunkify' into 'main'
change "obnam backup" to be all async
Closes #113
See merge request obnam/obnam!174
Diffstat (limited to 'src')
-rw-r--r-- | src/backup_run.rs | 310 | ||||
-rw-r--r-- | src/client.rs | 269 | ||||
-rw-r--r-- | src/cmd/backup.rs | 44 | ||||
-rw-r--r-- | src/generation.rs | 49 |
4 files changed, 274 insertions, 398 deletions
diff --git a/src/backup_run.rs b/src/backup_run.rs index 7b24e14..b536f6e 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -1,20 +1,27 @@ use crate::backup_progress::BackupProgress; use crate::backup_reason::Reason; +use crate::chunk::{GenerationChunk, GenerationChunkError}; +use crate::chunker::{Chunker, ChunkerError}; use crate::chunkid::ChunkId; -use crate::client::{BackupClient, ClientError}; +use crate::client::{AsyncBackupClient, ClientError}; use crate::config::ClientConfig; use crate::error::ObnamError; -use crate::fsentry::FilesystemEntry; +use crate::fsentry::{FilesystemEntry, FilesystemKind}; use crate::fsiter::{AnnotatedFsEntry, FsIterError, FsIterator}; use crate::generation::{ GenId, LocalGeneration, LocalGenerationError, NascentError, NascentGeneration, }; use crate::policy::BackupPolicy; -use log::{info, warn}; + +use bytesize::MIB; +use chrono::{DateTime, Local}; +use log::{debug, error, info, warn}; use std::path::{Path, PathBuf}; +const SQLITE_CHUNK_SIZE: usize = MIB as usize; + pub struct BackupRun<'a> { - client: &'a BackupClient, + client: &'a AsyncBackupClient, policy: BackupPolicy, buffer_size: usize, progress: Option<BackupProgress>, @@ -29,7 +36,16 @@ pub enum BackupError { FsIterError(#[from] FsIterError), #[error(transparent)] + NascentError(#[from] NascentError), + + #[error(transparent)] LocalGenerationError(#[from] LocalGenerationError), + + #[error(transparent)] + ChunkerError(#[from] ChunkerError), + + #[error(transparent)] + GenerationChunkError(#[from] GenerationChunkError), } #[derive(Debug)] @@ -41,6 +57,13 @@ pub struct FsEntryBackupOutcome { } #[derive(Debug)] +struct OneRootBackupOutcome { + pub files_count: i64, + pub warnings: Vec<BackupError>, + pub new_cachedir_tags: Vec<PathBuf>, +} + +#[derive(Debug)] pub struct RootsBackupOutcome { /// The number of backed up files. pub files_count: i64, @@ -48,10 +71,15 @@ pub struct RootsBackupOutcome { pub warnings: Vec<BackupError>, /// CACHEDIR.TAG files that aren't present in in a previous generation. pub new_cachedir_tags: Vec<PathBuf>, + /// Id of new generation. + pub gen_id: GenId, } impl<'a> BackupRun<'a> { - pub fn initial(config: &ClientConfig, client: &'a BackupClient) -> Result<Self, BackupError> { + pub fn initial( + config: &ClientConfig, + client: &'a AsyncBackupClient, + ) -> Result<Self, BackupError> { Ok(Self { client, policy: BackupPolicy::default(), @@ -62,7 +90,7 @@ impl<'a> BackupRun<'a> { pub fn incremental( config: &ClientConfig, - client: &'a BackupClient, + client: &'a AsyncBackupClient, ) -> Result<Self, BackupError> { Ok(Self { client, @@ -72,7 +100,7 @@ impl<'a> BackupRun<'a> { }) } - pub fn start( + pub async fn start( &mut self, genid: Option<&GenId>, oldname: &Path, @@ -86,7 +114,7 @@ impl<'a> BackupRun<'a> { Ok(LocalGeneration::open(oldname)?) } Some(genid) => { - let old = self.fetch_previous_generation(genid, oldname)?; + let old = self.fetch_previous_generation(genid, oldname).await?; let progress = BackupProgress::incremental(); progress.files_in_previous_generation(old.file_count()? as u64); @@ -97,13 +125,13 @@ impl<'a> BackupRun<'a> { } } - fn fetch_previous_generation( + async fn fetch_previous_generation( &self, genid: &GenId, oldname: &Path, ) -> Result<LocalGeneration, ObnamError> { let progress = BackupProgress::download_generation(genid); - let old = self.client.fetch_generation(genid, oldname)?; + let old = self.client.fetch_generation(genid, oldname).await?; progress.finish(); Ok(old) } @@ -114,88 +142,214 @@ impl<'a> BackupRun<'a> { } } - pub fn backup_roots( + pub async fn backup_roots( &self, config: &ClientConfig, old: &LocalGeneration, newpath: &Path, - ) -> Result<RootsBackupOutcome, NascentError> { - let mut warnings = vec![]; + ) -> Result<RootsBackupOutcome, ObnamError> { + let mut warnings: Vec<BackupError> = vec![]; let mut new_cachedir_tags = vec![]; let files_count = { let mut new = NascentGeneration::create(newpath)?; for root in &config.roots { - let iter = FsIterator::new(root, config.exclude_cache_tag_directories); - let entries = iter.map(|entry| { - if let Ok(ref entry) = entry { - let path = entry.inner.pathbuf(); - if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? { - new_cachedir_tags.push(path); + match self.backup_one_root(config, old, &mut new, root).await { + Ok(mut o) => { + new_cachedir_tags.append(&mut o.new_cachedir_tags); + if !o.warnings.is_empty() { + for err in o.warnings.iter() { + debug!("ignoring backup error {}", err); + self.found_problem(); + } + warnings.append(&mut o.warnings); } - }; - self.backup(entry, old) - }); - let mut new_warnings = new.insert_iter(entries)?; - warnings.append(&mut new_warnings); + } + Err(err) => { + debug!("ignoring backup error {}", err); + warnings.push(err.into()); + self.found_problem(); + } + } } new.file_count() }; self.finish(); + let gen_id = self.upload_nascent_generation(newpath).await?; + let gen_id = GenId::from_chunk_id(gen_id); Ok(RootsBackupOutcome { files_count, warnings, new_cachedir_tags, + gen_id, }) } - pub fn backup( + async fn backup_one_root( &self, - entry: Result<AnnotatedFsEntry, FsIterError>, + config: &ClientConfig, + old: &LocalGeneration, + new: &mut NascentGeneration, + root: &Path, + ) -> Result<OneRootBackupOutcome, NascentError> { + let mut warnings: Vec<BackupError> = vec![]; + let mut new_cachedir_tags = vec![]; + let iter = FsIterator::new(root, config.exclude_cache_tag_directories); + for entry in iter { + match entry { + Err(err) => { + warnings.push(err.into()); + } + Ok(entry) => { + let path = entry.inner.pathbuf(); + if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? { + new_cachedir_tags.push(path); + } + match self.backup_if_needed(entry, old).await { + Err(err) => { + warnings.push(err); + } + Ok(o) => { + if let Err(err) = + new.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag) + { + warnings.push(err.into()); + } + } + } + } + } + } + + Ok(OneRootBackupOutcome { + files_count: 0, // Caller will get file count from new. + warnings, + new_cachedir_tags, + }) + } + + async fn backup_if_needed( + &self, + entry: AnnotatedFsEntry, old: &LocalGeneration, ) -> Result<FsEntryBackupOutcome, BackupError> { - match entry { - Err(err) => { - warn!("backup: {}", err); - self.found_problem(); - Err(BackupError::FsIterError(err)) + let path = &entry.inner.pathbuf(); + info!("backup: {}", path.display()); + self.found_live_file(path); + let reason = self.policy.needs_backup(old, &entry.inner); + match reason { + Reason::IsNew | Reason::Changed | Reason::GenerationLookupError | Reason::Unknown => { + Ok(self.backup_one_entry(&entry, path, reason).await) } - Ok(entry) => { - let path = &entry.inner.pathbuf(); - info!("backup: {}", path.display()); - self.found_live_file(path); - let reason = self.policy.needs_backup(old, &entry.inner); - match reason { - Reason::IsNew - | Reason::Changed - | Reason::GenerationLookupError - | Reason::Unknown => Ok(backup_file( - self.client, - &entry, - path, - self.buffer_size, - reason, - )), - Reason::Unchanged | Reason::Skipped | Reason::FileError => { - let fileno = old.get_fileno(&entry.inner.pathbuf())?; - let ids = if let Some(fileno) = fileno { - let mut ids = vec![]; - for id in old.chunkids(fileno)?.iter()? { - ids.push(id?); - } - ids - } else { - vec![] - }; - Ok(FsEntryBackupOutcome { - entry: entry.inner, - ids, - reason, - is_cachedir_tag: entry.is_cachedir_tag, - }) + Reason::Unchanged | Reason::Skipped | Reason::FileError => { + let fileno = old.get_fileno(&entry.inner.pathbuf())?; + let ids = if let Some(fileno) = fileno { + let mut ids = vec![]; + for id in old.chunkids(fileno)?.iter()? { + ids.push(id?); } + ids + } else { + vec![] + }; + Ok(FsEntryBackupOutcome { + entry: entry.inner, + ids, + reason, + is_cachedir_tag: entry.is_cachedir_tag, + }) + } + } + } + + async fn backup_one_entry( + &self, + entry: &AnnotatedFsEntry, + path: &Path, + reason: Reason, + ) -> FsEntryBackupOutcome { + let ids = self + .upload_filesystem_entry(&entry.inner, self.buffer_size) + .await; + match ids { + Err(err) => { + warn!("error backing up {}, skipping it: {}", path.display(), err); + FsEntryBackupOutcome { + entry: entry.inner.clone(), + ids: vec![], + reason: Reason::FileError, + is_cachedir_tag: entry.is_cachedir_tag, } } + Ok(ids) => FsEntryBackupOutcome { + entry: entry.inner.clone(), + ids, + reason, + is_cachedir_tag: entry.is_cachedir_tag, + }, + } + } + + pub async fn upload_filesystem_entry( + &self, + e: &FilesystemEntry, + size: usize, + ) -> Result<Vec<ChunkId>, BackupError> { + let path = e.pathbuf(); + info!("uploading {:?}", path); + let ids = match e.kind() { + FilesystemKind::Regular => self.upload_regular_file(&path, size).await?, + FilesystemKind::Directory => vec![], + FilesystemKind::Symlink => vec![], + FilesystemKind::Socket => vec![], + FilesystemKind::Fifo => vec![], + }; + info!("upload OK for {:?}", path); + Ok(ids) + } + + pub async fn upload_generation( + &self, + filename: &Path, + size: usize, + ) -> Result<ChunkId, BackupError> { + info!("upload SQLite {}", filename.display()); + let ids = self.upload_regular_file(filename, size).await?; + let gen = GenerationChunk::new(ids); + let data = gen.to_data_chunk(¤t_timestamp())?; + let gen_id = self.client.upload_chunk(data).await?; + info!("uploaded generation {}", gen_id); + Ok(gen_id) + } + + async fn upload_regular_file( + &self, + filename: &Path, + size: usize, + ) -> Result<Vec<ChunkId>, BackupError> { + info!("upload file {}", filename.display()); + let mut chunk_ids = vec![]; + let file = std::fs::File::open(filename) + .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?; + let chunker = Chunker::new(size, file, filename); + for item in chunker { + let chunk = item?; + if let Some(chunk_id) = self.client.has_chunk(chunk.meta()).await? { + chunk_ids.push(chunk_id.clone()); + info!("reusing existing chunk {}", chunk_id); + } else { + let chunk_id = self.client.upload_chunk(chunk).await?; + chunk_ids.push(chunk_id.clone()); + info!("created new chunk {}", chunk_id); + } } + Ok(chunk_ids) + } + + async fn upload_nascent_generation(&self, filename: &Path) -> Result<ChunkId, ObnamError> { + let progress = BackupProgress::upload_generation(); + let gen_id = self.upload_generation(filename, SQLITE_CHUNK_SIZE).await?; + progress.finish(); + Ok(gen_id) } fn found_live_file(&self, path: &Path) { @@ -211,29 +365,7 @@ impl<'a> BackupRun<'a> { } } -fn backup_file( - client: &BackupClient, - entry: &AnnotatedFsEntry, - path: &Path, - chunk_size: usize, - reason: Reason, -) -> FsEntryBackupOutcome { - let ids = client.upload_filesystem_entry(&entry.inner, chunk_size); - match ids { - Err(err) => { - warn!("error backing up {}, skipping it: {}", path.display(), err); - FsEntryBackupOutcome { - entry: entry.inner.clone(), - ids: vec![], - reason: Reason::FileError, - is_cachedir_tag: entry.is_cachedir_tag, - } - } - Ok(ids) => FsEntryBackupOutcome { - entry: entry.inner.clone(), - ids, - reason, - is_cachedir_tag: entry.is_cachedir_tag, - }, - } +fn current_timestamp() -> String { + let now: DateTime<Local> = Local::now(); + format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z")) } diff --git a/src/client.rs b/src/client.rs index c655bb2..5451dfb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,17 +1,12 @@ -use crate::chunk::DataChunk; -use crate::chunk::{GenerationChunk, GenerationChunkError}; -use crate::chunker::{Chunker, ChunkerError}; +use crate::chunk::{DataChunk, GenerationChunk, GenerationChunkError}; use crate::chunkid::ChunkId; use crate::chunkmeta::ChunkMeta; use crate::cipher::{CipherEngine, CipherError}; use crate::config::{ClientConfig, ClientConfigError}; -use crate::fsentry::{FilesystemEntry, FilesystemKind}; use crate::generation::{FinishedGeneration, GenId, LocalGeneration, LocalGenerationError}; use crate::genlist::GenerationList; -use chrono::{DateTime, Local}; use log::{debug, error, info}; -use reqwest::blocking::Client; use reqwest::header::HeaderMap; use std::collections::HashMap; use std::fs::File; @@ -50,9 +45,6 @@ pub enum ClientError { #[error(transparent)] LocalGenerationError(#[from] LocalGenerationError), - #[error(transparent)] - ChunkerError(#[from] ChunkerError), - #[error("couldn't convert response chunk-meta header to string: {0}")] MetaHeaderToString(reqwest::header::ToStrError), @@ -92,6 +84,13 @@ impl AsyncBackupClient { chunk_client: AsyncChunkClient::new(config)?, }) } + pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> { + self.chunk_client.has_chunk(meta).await + } + + pub async fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> { + self.chunk_client.upload_chunk(chunk).await + } pub async fn list_generations(&self) -> Result<GenerationList, ClientError> { self.chunk_client.list_generations().await @@ -159,229 +158,8 @@ impl AsyncChunkClient { format!("{}/chunks", self.base_url()) } - pub async fn list_generations(&self) -> Result<GenerationList, ClientError> { - let (_, body) = self.get("", &[("generation", "true")]).await?; - - let map: HashMap<String, ChunkMeta> = - serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?; - debug!("list_generations: map={:?}", map); - let finished = map - .iter() - .map(|(id, meta)| FinishedGeneration::new(id, meta.ended().map_or("", |s| s))) - .collect(); - Ok(GenerationList::new(finished)) - } - - pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { - let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?; - let meta = self.get_chunk_meta_header(chunk_id, &headers)?; - - let meta_bytes = meta.to_json_vec(); - let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?; - - Ok(chunk) - } - - async fn get( - &self, - path: &str, - query: &[(&str, &str)], - ) -> Result<(HeaderMap, Vec<u8>), ClientError> { - let url = format!("{}{}", &self.chunks_url(), path); - info!("GET {}", url); - - // Build HTTP request structure. - let req = self - .client - .get(&url) - .query(query) - .build() - .map_err(ClientError::ReqwestError)?; - - // Make HTTP request. - let res = self - .client - .execute(req) - .await - .map_err(ClientError::ReqwestError)?; - - // Did it work? - if res.status() != 200 { - return Err(ClientError::NotFound(path.to_string())); - } - - // Return headers and body. - let headers = res.headers().clone(); - let body = res.bytes().await.map_err(ClientError::ReqwestError)?; - let body = body.to_vec(); - Ok((headers, body)) - } - - fn get_chunk_meta_header( - &self, - chunk_id: &ChunkId, - headers: &HeaderMap, - ) -> Result<ChunkMeta, ClientError> { - let meta = headers.get("chunk-meta"); - - if meta.is_none() { - let err = ClientError::NoChunkMeta(chunk_id.clone()); - error!("fetching chunk {} failed: {}", chunk_id, err); - return Err(err); - } - - let meta = meta - .unwrap() - .to_str() - .map_err(ClientError::MetaHeaderToString)?; - let meta: ChunkMeta = serde_json::from_str(meta).map_err(ClientError::JsonParse)?; - - Ok(meta) - } -} - -pub struct BackupClient { - chunk_client: ChunkClient, -} - -impl BackupClient { - pub fn new(config: &ClientConfig) -> Result<Self, ClientError> { - info!("creating backup client with config: {:#?}", config); - Ok(Self { - chunk_client: ChunkClient::new(config)?, - }) - } - - pub fn upload_filesystem_entry( - &self, - e: &FilesystemEntry, - size: usize, - ) -> Result<Vec<ChunkId>, ClientError> { - let path = e.pathbuf(); - info!("uploading {:?}", path); - let ids = match e.kind() { - FilesystemKind::Regular => self.read_file(&path, size)?, - FilesystemKind::Directory => vec![], - FilesystemKind::Symlink => vec![], - FilesystemKind::Socket => vec![], - FilesystemKind::Fifo => vec![], - }; - info!("upload OK for {:?}", path); - Ok(ids) - } - - pub fn upload_generation(&self, filename: &Path, size: usize) -> Result<ChunkId, ClientError> { - info!("upload SQLite {}", filename.display()); - let ids = self.read_file(filename, size)?; - let gen = GenerationChunk::new(ids); - let data = gen.to_data_chunk(¤t_timestamp())?; - let gen_id = self.upload_chunk(data)?; - info!("uploaded generation {}", gen_id); - Ok(gen_id) - } - - fn read_file(&self, filename: &Path, size: usize) -> Result<Vec<ChunkId>, ClientError> { - info!("upload file {}", filename.display()); - let file = std::fs::File::open(filename) - .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?; - let chunker = Chunker::new(size, file, filename); - let chunk_ids = self.upload_new_file_chunks(chunker)?; - Ok(chunk_ids) - } - - pub fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> { - self.chunk_client.has_chunk(meta) - } - - pub fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> { - self.chunk_client.upload_chunk(chunk) - } - - pub fn upload_new_file_chunks(&self, chunker: Chunker) -> Result<Vec<ChunkId>, ClientError> { - let mut chunk_ids = vec![]; - for item in chunker { - let chunk = item?; - if let Some(chunk_id) = self.has_chunk(chunk.meta())? { - chunk_ids.push(chunk_id.clone()); - info!("reusing existing chunk {}", chunk_id); - } else { - let chunk_id = self.upload_chunk(chunk)?; - chunk_ids.push(chunk_id.clone()); - info!("created new chunk {}", chunk_id); - } - } - - Ok(chunk_ids) - } - - pub fn list_generations(&self) -> Result<GenerationList, ClientError> { - self.chunk_client.list_generations() - } - - pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { - self.chunk_client.fetch_chunk(chunk_id) - } - - fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result<GenerationChunk, ClientError> { - let chunk = self.fetch_chunk(gen_id.as_chunk_id())?; - let gen = GenerationChunk::from_data_chunk(&chunk)?; - Ok(gen) - } - - pub fn fetch_generation( - &self, - gen_id: &GenId, - dbname: &Path, - ) -> Result<LocalGeneration, ClientError> { - let gen = self.fetch_generation_chunk(gen_id)?; - - // Fetch the SQLite file, storing it in the named file. - let mut dbfile = File::create(&dbname) - .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?; - for id in gen.chunk_ids() { - let chunk = self.fetch_chunk(id)?; - dbfile - .write_all(chunk.data()) - .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?; - } - info!("downloaded generation to {}", dbname.display()); - - let gen = LocalGeneration::open(dbname)?; - Ok(gen) - } -} - -pub struct ChunkClient { - client: Client, - base_url: String, - cipher: CipherEngine, -} - -impl ChunkClient { - pub fn new(config: &ClientConfig) -> Result<Self, ClientError> { - let pass = config.passwords()?; - - let client = Client::builder() - .danger_accept_invalid_certs(!config.verify_tls_cert) - .build() - .map_err(ClientError::ReqwestError)?; - Ok(Self { - client, - base_url: config.server_url.to_string(), - cipher: CipherEngine::new(&pass), - }) - } - - fn base_url(&self) -> &str { - &self.base_url - } - - fn chunks_url(&self) -> String { - format!("{}/chunks", self.base_url()) - } - - pub fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> { - let body = match self.get("", &[("sha256", meta.sha256())]) { + pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> { + let body = match self.get("", &[("sha256", meta.sha256())]).await { Ok((_, body)) => body, Err(err) => return Err(err), }; @@ -398,7 +176,7 @@ impl ChunkClient { Ok(has) } - pub fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> { + pub async fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> { let enc = self.cipher.encrypt_chunk(&chunk)?; let res = self .client @@ -406,9 +184,10 @@ impl ChunkClient { .header("chunk-meta", chunk.meta().to_json()) .body(enc.ciphertext().to_vec()) .send() + .await .map_err(ClientError::ReqwestError)?; debug!("upload_chunk: res={:?}", res); - let res: HashMap<String, String> = res.json().map_err(ClientError::ReqwestError)?; + let res: HashMap<String, String> = res.json().await.map_err(ClientError::ReqwestError)?; let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { debug!("upload_chunk: id={}", chunk_id); chunk_id.parse().unwrap() @@ -419,8 +198,8 @@ impl ChunkClient { Ok(chunk_id) } - pub fn list_generations(&self) -> Result<GenerationList, ClientError> { - let (_, body) = self.get("", &[("generation", "true")])?; + pub async fn list_generations(&self) -> Result<GenerationList, ClientError> { + let (_, body) = self.get("", &[("generation", "true")]).await?; let map: HashMap<String, ChunkMeta> = serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?; @@ -432,8 +211,8 @@ impl ChunkClient { Ok(GenerationList::new(finished)) } - pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { - let (headers, body) = self.get(&format!("/{}", chunk_id), &[])?; + pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { + let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?; let meta = self.get_chunk_meta_header(chunk_id, &headers)?; let meta_bytes = meta.to_json_vec(); @@ -442,7 +221,11 @@ impl ChunkClient { Ok(chunk) } - fn get(&self, path: &str, query: &[(&str, &str)]) -> Result<(HeaderMap, Vec<u8>), ClientError> { + async fn get( + &self, + path: &str, + query: &[(&str, &str)], + ) -> Result<(HeaderMap, Vec<u8>), ClientError> { let url = format!("{}{}", &self.chunks_url(), path); info!("GET {}", url); @@ -458,6 +241,7 @@ impl ChunkClient { let res = self .client .execute(req) + .await .map_err(ClientError::ReqwestError)?; // Did it work? @@ -467,7 +251,7 @@ impl ChunkClient { // Return headers and body. let headers = res.headers().clone(); - let body = res.bytes().map_err(ClientError::ReqwestError)?; + let body = res.bytes().await.map_err(ClientError::ReqwestError)?; let body = body.to_vec(); Ok((headers, body)) } @@ -494,8 +278,3 @@ impl ChunkClient { Ok(meta) } } - -fn current_timestamp() -> String { - let now: DateTime<Local> = Local::now(); - format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z")) -} diff --git a/src/cmd/backup.rs b/src/cmd/backup.rs index 04dfb05..8f3d6d5 100644 --- a/src/cmd/backup.rs +++ b/src/cmd/backup.rs @@ -1,27 +1,29 @@ -use crate::backup_progress::BackupProgress; use crate::backup_run::BackupRun; -use crate::chunkid::ChunkId; -use crate::client::BackupClient; +use crate::client::AsyncBackupClient; use crate::config::ClientConfig; use crate::error::ObnamError; -use bytesize::MIB; +use crate::generation::GenId; + use log::info; -use std::path::Path; use std::time::SystemTime; use structopt::StructOpt; use tempfile::NamedTempFile; - -const SQLITE_CHUNK_SIZE: usize = MIB as usize; +use tokio::runtime::Runtime; #[derive(Debug, StructOpt)] pub struct Backup {} impl Backup { pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { let runtime = SystemTime::now(); - let client = BackupClient::new(config)?; - let genlist = client.list_generations()?; + let client = AsyncBackupClient::new(config)?; + let genlist = client.list_generations().await?; let oldtemp = NamedTempFile::new()?; let newtemp = NamedTempFile::new()?; @@ -30,19 +32,17 @@ impl Backup { Err(_) => { info!("fresh backup without a previous generation"); let mut run = BackupRun::initial(config, &client)?; - let old = run.start(None, oldtemp.path())?; - (false, run.backup_roots(config, &old, newtemp.path())?) + let old = run.start(None, oldtemp.path()).await?; + (false, run.backup_roots(config, &old, newtemp.path()).await?) } Ok(old_id) => { info!("incremental backup based on {}", old_id); let mut run = BackupRun::incremental(config, &client)?; - let old = run.start(Some(&old_id), oldtemp.path())?; - (true, run.backup_roots(config, &old, newtemp.path())?) + let old = run.start(Some(&old_id), oldtemp.path()).await?; + (true, run.backup_roots(config, &old, newtemp.path()).await?) } }; - let gen_id = upload_nascent_generation(&client, newtemp.path())?; - for w in outcome.warnings.iter() { println!("warning: {}", w); } @@ -58,7 +58,7 @@ impl Backup { report_stats( &runtime, outcome.files_count, - &gen_id, + &outcome.gen_id, outcome.warnings.len(), )?; @@ -73,7 +73,7 @@ impl Backup { fn report_stats( runtime: &SystemTime, file_count: i64, - gen_id: &ChunkId, + gen_id: &GenId, num_warnings: usize, ) -> Result<(), ObnamError> { println!("status: OK"); @@ -83,13 +83,3 @@ fn report_stats( println!("generation-id: {}", gen_id); Ok(()) } - -fn upload_nascent_generation( - client: &BackupClient, - filename: &Path, -) -> Result<ChunkId, ObnamError> { - let progress = BackupProgress::upload_generation(); - let gen_id = client.upload_generation(filename, SQLITE_CHUNK_SIZE)?; - progress.finish(); - Ok(gen_id) -} diff --git a/src/generation.rs b/src/generation.rs index 5412ae7..bd36a19 100644 --- a/src/generation.rs +++ b/src/generation.rs @@ -1,8 +1,6 @@ use crate::backup_reason::Reason; -use crate::backup_run::{BackupError, FsEntryBackupOutcome}; use crate::chunkid::ChunkId; use crate::fsentry::FilesystemEntry; -use log::debug; use rusqlite::Connection; use std::fmt; use std::path::{Path, PathBuf}; @@ -50,9 +48,6 @@ pub enum NascentError { #[error(transparent)] LocalGenerationError(#[from] LocalGenerationError), - #[error(transparent)] - BackupError(#[from] BackupError), - #[error("SQL transaction error: {0}")] Transaction(rusqlite::Error), @@ -89,33 +84,6 @@ impl NascentGeneration { t.commit().map_err(NascentError::Commit)?; Ok(()) } - - pub fn insert_iter( - &mut self, - entries: impl Iterator<Item = Result<FsEntryBackupOutcome, BackupError>>, - ) -> Result<Vec<BackupError>, NascentError> { - let t = self.conn.transaction().map_err(NascentError::Transaction)?; - let mut warnings = vec![]; - for r in entries { - match r { - Err(err) => { - debug!("ignoring backup error {}", err); - warnings.push(err); - } - Ok(FsEntryBackupOutcome { - entry, - ids, - reason, - is_cachedir_tag, - }) => { - self.fileno += 1; - sql::insert_one(&t, entry, self.fileno, &ids[..], reason, is_cachedir_tag)?; - } - } - } - t.commit().map_err(NascentError::Commit)?; - Ok(warnings) - } } /// A finished generation. @@ -479,6 +447,9 @@ mod test { assert!(filename.exists()); } + // FIXME: This is way too complicated a test function. It should + // be simplified, possibly by re-thinking the abstractions of the + // code it calls. #[test] fn remembers_cachedir_tags() { use crate::{ @@ -516,20 +487,24 @@ mod test { .unwrap(); let entries = vec![ - Ok(FsEntryBackupOutcome { + FsEntryBackupOutcome { entry: FilesystemEntry::from_metadata(nontag_path2, &metadata).unwrap(), ids: vec![], reason: Reason::IsNew, is_cachedir_tag: false, - }), - Ok(FsEntryBackupOutcome { + }, + FsEntryBackupOutcome { entry: FilesystemEntry::from_metadata(tag_path2, &metadata).unwrap(), ids: vec![], reason: Reason::IsNew, is_cachedir_tag: true, - }), + }, ]; - gen.insert_iter(entries.into_iter()).unwrap(); + + for o in entries { + gen.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag) + .unwrap(); + } drop(gen); |