From fc4cc8b028a6248f98fa09d28f9489a88949d45e Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sun, 8 Aug 2021 13:53:01 +0300 Subject: refactor: call NascentGeneration::insert from ::insert_iter This is a step towards getting rid of insert_iter entirely, which would make it easier to make `obnam backup` use async. I originally split insert_iter so I could use a single transaction for inserting many rows, but it seems to not be needed for speed after all. I've benchmarked backing up a large file with and without this change, and there's no real difference. I've not benchmarked with a large number of files. Even if there's a performance hit from using multiple transactions, my hope is that by being able to use more CPUs/threads for backing up will outweigh that by far. Sponsored-by: author --- src/generation.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/generation.rs b/src/generation.rs index 5412ae7..45b8afa 100644 --- a/src/generation.rs +++ b/src/generation.rs @@ -94,7 +94,6 @@ impl NascentGeneration { &mut self, entries: impl Iterator>, ) -> Result, NascentError> { - let t = self.conn.transaction().map_err(NascentError::Transaction)?; let mut warnings = vec![]; for r in entries { match r { @@ -108,12 +107,10 @@ impl NascentGeneration { reason, is_cachedir_tag, }) => { - self.fileno += 1; - sql::insert_one(&t, entry, self.fileno, &ids[..], reason, is_cachedir_tag)?; + self.insert(entry, &ids, reason, is_cachedir_tag)?; } } } - t.commit().map_err(NascentError::Commit)?; Ok(warnings) } } -- cgit v1.2.1 From 0faa5f4953c81bbb81e875a87b8322ecfdd97cf2 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sun, 8 Aug 2021 14:27:16 +0300 Subject: refactor: use for loop over an iterator instead of .insert_iter This makes the code more explicit, which is good for now, and is a step towards making it all use async. There will be a need to refactor this further with better abstractions, once async works. Sponsored-by: author --- src/backup_run.rs | 119 ++++++++++++++++++++++++++++++------------------------ src/generation.rs | 3 -- 2 files changed, 66 insertions(+), 56 deletions(-) diff --git a/src/backup_run.rs b/src/backup_run.rs index 7b24e14..cd9c10f 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -10,7 +10,7 @@ use crate::generation::{ GenId, LocalGeneration, LocalGenerationError, NascentError, NascentGeneration, }; use crate::policy::BackupPolicy; -use log::{info, warn}; +use log::{debug, info, warn}; use std::path::{Path, PathBuf}; pub struct BackupRun<'a> { @@ -28,6 +28,9 @@ pub enum BackupError { #[error(transparent)] FsIterError(#[from] FsIterError), + #[error(transparent)] + NascentError(#[from] NascentError), + #[error(transparent)] LocalGenerationError(#[from] LocalGenerationError), } @@ -120,23 +123,43 @@ impl<'a> BackupRun<'a> { old: &LocalGeneration, newpath: &Path, ) -> Result { - let mut warnings = vec![]; + let mut warnings: Vec = vec![]; let mut new_cachedir_tags = vec![]; let files_count = { let mut new = NascentGeneration::create(newpath)?; for root in &config.roots { let iter = FsIterator::new(root, config.exclude_cache_tag_directories); - let entries = iter.map(|entry| { - if let Ok(ref entry) = entry { - let path = entry.inner.pathbuf(); - if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? { - new_cachedir_tags.push(path); + for entry in iter { + match entry { + Err(err) => { + debug!("ignoring backup error {}", err); + warnings.push(err.into()); + self.found_problem(); + } + Ok(entry) => { + let path = entry.inner.pathbuf(); + if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? { + new_cachedir_tags.push(path); + } + match self.backup(entry, old) { + Err(err) => { + debug!("ignoring backup error {}", err); + warnings.push(err); + self.found_problem(); + } + Ok(o) => { + if let Err(err) = + new.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag) + { + debug!("ignoring backup error {}", err); + warnings.push(err.into()); + self.found_problem(); + } + } + } } - }; - self.backup(entry, old) - }); - let mut new_warnings = new.insert_iter(entries)?; - warnings.append(&mut new_warnings); + } + } } new.file_count() }; @@ -148,52 +171,42 @@ impl<'a> BackupRun<'a> { }) } - pub fn backup( + fn backup( &self, - entry: Result, + entry: AnnotatedFsEntry, old: &LocalGeneration, ) -> Result { - match entry { - Err(err) => { - warn!("backup: {}", err); - self.found_problem(); - Err(BackupError::FsIterError(err)) + let path = &entry.inner.pathbuf(); + info!("backup: {}", path.display()); + self.found_live_file(path); + let reason = self.policy.needs_backup(old, &entry.inner); + match reason { + Reason::IsNew | Reason::Changed | Reason::GenerationLookupError | Reason::Unknown => { + Ok(backup_file( + self.client, + &entry, + path, + self.buffer_size, + reason, + )) } - Ok(entry) => { - let path = &entry.inner.pathbuf(); - info!("backup: {}", path.display()); - self.found_live_file(path); - let reason = self.policy.needs_backup(old, &entry.inner); - match reason { - Reason::IsNew - | Reason::Changed - | Reason::GenerationLookupError - | Reason::Unknown => Ok(backup_file( - self.client, - &entry, - path, - self.buffer_size, - reason, - )), - Reason::Unchanged | Reason::Skipped | Reason::FileError => { - let fileno = old.get_fileno(&entry.inner.pathbuf())?; - let ids = if let Some(fileno) = fileno { - let mut ids = vec![]; - for id in old.chunkids(fileno)?.iter()? { - ids.push(id?); - } - ids - } else { - vec![] - }; - Ok(FsEntryBackupOutcome { - entry: entry.inner, - ids, - reason, - is_cachedir_tag: entry.is_cachedir_tag, - }) + Reason::Unchanged | Reason::Skipped | Reason::FileError => { + let fileno = old.get_fileno(&entry.inner.pathbuf())?; + let ids = if let Some(fileno) = fileno { + let mut ids = vec![]; + for id in old.chunkids(fileno)?.iter()? { + ids.push(id?); } - } + ids + } else { + vec![] + }; + Ok(FsEntryBackupOutcome { + entry: entry.inner, + ids, + reason, + is_cachedir_tag: entry.is_cachedir_tag, + }) } } } diff --git a/src/generation.rs b/src/generation.rs index 45b8afa..49e42f5 100644 --- a/src/generation.rs +++ b/src/generation.rs @@ -50,9 +50,6 @@ pub enum NascentError { #[error(transparent)] LocalGenerationError(#[from] LocalGenerationError), - #[error(transparent)] - BackupError(#[from] BackupError), - #[error("SQL transaction error: {0}")] Transaction(rusqlite::Error), -- cgit v1.2.1 From 30c20dcf3d81e9147e2bd4617bd055a5a8a17896 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sun, 8 Aug 2021 14:35:58 +0300 Subject: refactor: drop NascentGeneration::insert_iter It was only used by a test function, which is now changed to not use it. Add comment to the test function that it's too complicated and things need refactoring. However, that probably needs to wait for new abstractions. Sponsored-by: author --- src/generation.rs | 43 ++++++++++++------------------------------- 1 file changed, 12 insertions(+), 31 deletions(-) diff --git a/src/generation.rs b/src/generation.rs index 49e42f5..bd36a19 100644 --- a/src/generation.rs +++ b/src/generation.rs @@ -1,8 +1,6 @@ use crate::backup_reason::Reason; -use crate::backup_run::{BackupError, FsEntryBackupOutcome}; use crate::chunkid::ChunkId; use crate::fsentry::FilesystemEntry; -use log::debug; use rusqlite::Connection; use std::fmt; use std::path::{Path, PathBuf}; @@ -86,30 +84,6 @@ impl NascentGeneration { t.commit().map_err(NascentError::Commit)?; Ok(()) } - - pub fn insert_iter( - &mut self, - entries: impl Iterator>, - ) -> Result, NascentError> { - let mut warnings = vec![]; - for r in entries { - match r { - Err(err) => { - debug!("ignoring backup error {}", err); - warnings.push(err); - } - Ok(FsEntryBackupOutcome { - entry, - ids, - reason, - is_cachedir_tag, - }) => { - self.insert(entry, &ids, reason, is_cachedir_tag)?; - } - } - } - Ok(warnings) - } } /// A finished generation. @@ -473,6 +447,9 @@ mod test { assert!(filename.exists()); } + // FIXME: This is way too complicated a test function. It should + // be simplified, possibly by re-thinking the abstractions of the + // code it calls. #[test] fn remembers_cachedir_tags() { use crate::{ @@ -510,20 +487,24 @@ mod test { .unwrap(); let entries = vec![ - Ok(FsEntryBackupOutcome { + FsEntryBackupOutcome { entry: FilesystemEntry::from_metadata(nontag_path2, &metadata).unwrap(), ids: vec![], reason: Reason::IsNew, is_cachedir_tag: false, - }), - Ok(FsEntryBackupOutcome { + }, + FsEntryBackupOutcome { entry: FilesystemEntry::from_metadata(tag_path2, &metadata).unwrap(), ids: vec![], reason: Reason::IsNew, is_cachedir_tag: true, - }), + }, ]; - gen.insert_iter(entries.into_iter()).unwrap(); + + for o in entries { + gen.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag) + .unwrap(); + } drop(gen); -- cgit v1.2.1 From 6e9a1f5f45a5608a05931fd4d579c8ab52316f55 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sun, 8 Aug 2021 15:03:31 +0300 Subject: refactor: use async for "obnam backup" This changes things so that "obnam backup" uses async for everything. The old non-async BackupClient and ChunkClient are dropped. This does NOT move the chunking, and checksum computation, to its own function. This will happen later. Sponsored-by: author --- src/backup_run.rs | 41 +++++---- src/client.rs | 247 +++++++++++------------------------------------------- src/cmd/backup.rs | 30 ++++--- 3 files changed, 89 insertions(+), 229 deletions(-) diff --git a/src/backup_run.rs b/src/backup_run.rs index cd9c10f..39ef217 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -1,7 +1,7 @@ use crate::backup_progress::BackupProgress; use crate::backup_reason::Reason; use crate::chunkid::ChunkId; -use crate::client::{BackupClient, ClientError}; +use crate::client::{AsyncBackupClient, ClientError}; use crate::config::ClientConfig; use crate::error::ObnamError; use crate::fsentry::FilesystemEntry; @@ -14,7 +14,7 @@ use log::{debug, info, warn}; use std::path::{Path, PathBuf}; pub struct BackupRun<'a> { - client: &'a BackupClient, + client: &'a AsyncBackupClient, policy: BackupPolicy, buffer_size: usize, progress: Option, @@ -54,7 +54,10 @@ pub struct RootsBackupOutcome { } impl<'a> BackupRun<'a> { - pub fn initial(config: &ClientConfig, client: &'a BackupClient) -> Result { + pub fn initial( + config: &ClientConfig, + client: &'a AsyncBackupClient, + ) -> Result { Ok(Self { client, policy: BackupPolicy::default(), @@ -65,7 +68,7 @@ impl<'a> BackupRun<'a> { pub fn incremental( config: &ClientConfig, - client: &'a BackupClient, + client: &'a AsyncBackupClient, ) -> Result { Ok(Self { client, @@ -75,7 +78,7 @@ impl<'a> BackupRun<'a> { }) } - pub fn start( + pub async fn start( &mut self, genid: Option<&GenId>, oldname: &Path, @@ -89,7 +92,7 @@ impl<'a> BackupRun<'a> { Ok(LocalGeneration::open(oldname)?) } Some(genid) => { - let old = self.fetch_previous_generation(genid, oldname)?; + let old = self.fetch_previous_generation(genid, oldname).await?; let progress = BackupProgress::incremental(); progress.files_in_previous_generation(old.file_count()? as u64); @@ -100,13 +103,13 @@ impl<'a> BackupRun<'a> { } } - fn fetch_previous_generation( + async fn fetch_previous_generation( &self, genid: &GenId, oldname: &Path, ) -> Result { let progress = BackupProgress::download_generation(genid); - let old = self.client.fetch_generation(genid, oldname)?; + let old = self.client.fetch_generation(genid, oldname).await?; progress.finish(); Ok(old) } @@ -117,7 +120,7 @@ impl<'a> BackupRun<'a> { } } - pub fn backup_roots( + pub async fn backup_roots( &self, config: &ClientConfig, old: &LocalGeneration, @@ -141,7 +144,7 @@ impl<'a> BackupRun<'a> { if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? { new_cachedir_tags.push(path); } - match self.backup(entry, old) { + match self.backup(entry, old).await { Err(err) => { debug!("ignoring backup error {}", err); warnings.push(err); @@ -171,7 +174,7 @@ impl<'a> BackupRun<'a> { }) } - fn backup( + async fn backup( &self, entry: AnnotatedFsEntry, old: &LocalGeneration, @@ -182,13 +185,7 @@ impl<'a> BackupRun<'a> { let reason = self.policy.needs_backup(old, &entry.inner); match reason { Reason::IsNew | Reason::Changed | Reason::GenerationLookupError | Reason::Unknown => { - Ok(backup_file( - self.client, - &entry, - path, - self.buffer_size, - reason, - )) + Ok(backup_file(self.client, &entry, path, self.buffer_size, reason).await) } Reason::Unchanged | Reason::Skipped | Reason::FileError => { let fileno = old.get_fileno(&entry.inner.pathbuf())?; @@ -224,14 +221,16 @@ impl<'a> BackupRun<'a> { } } -fn backup_file( - client: &BackupClient, +async fn backup_file( + client: &AsyncBackupClient, entry: &AnnotatedFsEntry, path: &Path, chunk_size: usize, reason: Reason, ) -> FsEntryBackupOutcome { - let ids = client.upload_filesystem_entry(&entry.inner, chunk_size); + let ids = client + .upload_filesystem_entry(&entry.inner, chunk_size) + .await; match ids { Err(err) => { warn!("error backing up {}, skipping it: {}", path.display(), err); diff --git a/src/client.rs b/src/client.rs index c655bb2..9fddc18 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,7 +11,6 @@ use crate::genlist::GenerationList; use chrono::{DateTime, Local}; use log::{debug, error, info}; -use reqwest::blocking::Client; use reqwest::header::HeaderMap; use std::collections::HashMap; use std::fs::File; @@ -93,166 +92,7 @@ impl AsyncBackupClient { }) } - pub async fn list_generations(&self) -> Result { - self.chunk_client.list_generations().await - } - - pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { - self.chunk_client.fetch_chunk(chunk_id).await - } - - async fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result { - let chunk = self.fetch_chunk(gen_id.as_chunk_id()).await?; - let gen = GenerationChunk::from_data_chunk(&chunk)?; - Ok(gen) - } - - pub async fn fetch_generation( - &self, - gen_id: &GenId, - dbname: &Path, - ) -> Result { - let gen = self.fetch_generation_chunk(gen_id).await?; - - // Fetch the SQLite file, storing it in the named file. - let mut dbfile = File::create(&dbname) - .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?; - for id in gen.chunk_ids() { - let chunk = self.fetch_chunk(id).await?; - dbfile - .write_all(chunk.data()) - .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?; - } - info!("downloaded generation to {}", dbname.display()); - - let gen = LocalGeneration::open(dbname)?; - Ok(gen) - } -} - -pub struct AsyncChunkClient { - client: reqwest::Client, - base_url: String, - cipher: CipherEngine, -} - -impl AsyncChunkClient { - pub fn new(config: &ClientConfig) -> Result { - let pass = config.passwords()?; - - let client = reqwest::Client::builder() - .danger_accept_invalid_certs(!config.verify_tls_cert) - .build() - .map_err(ClientError::ReqwestError)?; - Ok(Self { - client, - base_url: config.server_url.to_string(), - cipher: CipherEngine::new(&pass), - }) - } - - fn base_url(&self) -> &str { - &self.base_url - } - - fn chunks_url(&self) -> String { - format!("{}/chunks", self.base_url()) - } - - pub async fn list_generations(&self) -> Result { - let (_, body) = self.get("", &[("generation", "true")]).await?; - - let map: HashMap = - serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?; - debug!("list_generations: map={:?}", map); - let finished = map - .iter() - .map(|(id, meta)| FinishedGeneration::new(id, meta.ended().map_or("", |s| s))) - .collect(); - Ok(GenerationList::new(finished)) - } - - pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { - let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?; - let meta = self.get_chunk_meta_header(chunk_id, &headers)?; - - let meta_bytes = meta.to_json_vec(); - let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?; - - Ok(chunk) - } - - async fn get( - &self, - path: &str, - query: &[(&str, &str)], - ) -> Result<(HeaderMap, Vec), ClientError> { - let url = format!("{}{}", &self.chunks_url(), path); - info!("GET {}", url); - - // Build HTTP request structure. - let req = self - .client - .get(&url) - .query(query) - .build() - .map_err(ClientError::ReqwestError)?; - - // Make HTTP request. - let res = self - .client - .execute(req) - .await - .map_err(ClientError::ReqwestError)?; - - // Did it work? - if res.status() != 200 { - return Err(ClientError::NotFound(path.to_string())); - } - - // Return headers and body. - let headers = res.headers().clone(); - let body = res.bytes().await.map_err(ClientError::ReqwestError)?; - let body = body.to_vec(); - Ok((headers, body)) - } - - fn get_chunk_meta_header( - &self, - chunk_id: &ChunkId, - headers: &HeaderMap, - ) -> Result { - let meta = headers.get("chunk-meta"); - - if meta.is_none() { - let err = ClientError::NoChunkMeta(chunk_id.clone()); - error!("fetching chunk {} failed: {}", chunk_id, err); - return Err(err); - } - - let meta = meta - .unwrap() - .to_str() - .map_err(ClientError::MetaHeaderToString)?; - let meta: ChunkMeta = serde_json::from_str(meta).map_err(ClientError::JsonParse)?; - - Ok(meta) - } -} - -pub struct BackupClient { - chunk_client: ChunkClient, -} - -impl BackupClient { - pub fn new(config: &ClientConfig) -> Result { - info!("creating backup client with config: {:#?}", config); - Ok(Self { - chunk_client: ChunkClient::new(config)?, - }) - } - - pub fn upload_filesystem_entry( + pub async fn upload_filesystem_entry( &self, e: &FilesystemEntry, size: usize, @@ -260,7 +100,7 @@ impl BackupClient { let path = e.pathbuf(); info!("uploading {:?}", path); let ids = match e.kind() { - FilesystemKind::Regular => self.read_file(&path, size)?, + FilesystemKind::Regular => self.read_file(&path, size).await?, FilesystemKind::Directory => vec![], FilesystemKind::Symlink => vec![], FilesystemKind::Socket => vec![], @@ -270,42 +110,49 @@ impl BackupClient { Ok(ids) } - pub fn upload_generation(&self, filename: &Path, size: usize) -> Result { + pub async fn upload_generation( + &self, + filename: &Path, + size: usize, + ) -> Result { info!("upload SQLite {}", filename.display()); - let ids = self.read_file(filename, size)?; + let ids = self.read_file(filename, size).await?; let gen = GenerationChunk::new(ids); let data = gen.to_data_chunk(¤t_timestamp())?; - let gen_id = self.upload_chunk(data)?; + let gen_id = self.upload_chunk(data).await?; info!("uploaded generation {}", gen_id); Ok(gen_id) } - fn read_file(&self, filename: &Path, size: usize) -> Result, ClientError> { + async fn read_file(&self, filename: &Path, size: usize) -> Result, ClientError> { info!("upload file {}", filename.display()); let file = std::fs::File::open(filename) .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?; let chunker = Chunker::new(size, file, filename); - let chunk_ids = self.upload_new_file_chunks(chunker)?; + let chunk_ids = self.upload_new_file_chunks(chunker).await?; Ok(chunk_ids) } - pub fn has_chunk(&self, meta: &ChunkMeta) -> Result, ClientError> { - self.chunk_client.has_chunk(meta) + pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result, ClientError> { + self.chunk_client.has_chunk(meta).await } - pub fn upload_chunk(&self, chunk: DataChunk) -> Result { - self.chunk_client.upload_chunk(chunk) + pub async fn upload_chunk(&self, chunk: DataChunk) -> Result { + self.chunk_client.upload_chunk(chunk).await } - pub fn upload_new_file_chunks(&self, chunker: Chunker) -> Result, ClientError> { + pub async fn upload_new_file_chunks( + &self, + chunker: Chunker, + ) -> Result, ClientError> { let mut chunk_ids = vec![]; for item in chunker { let chunk = item?; - if let Some(chunk_id) = self.has_chunk(chunk.meta())? { + if let Some(chunk_id) = self.has_chunk(chunk.meta()).await? { chunk_ids.push(chunk_id.clone()); info!("reusing existing chunk {}", chunk_id); } else { - let chunk_id = self.upload_chunk(chunk)?; + let chunk_id = self.upload_chunk(chunk).await?; chunk_ids.push(chunk_id.clone()); info!("created new chunk {}", chunk_id); } @@ -314,32 +161,32 @@ impl BackupClient { Ok(chunk_ids) } - pub fn list_generations(&self) -> Result { - self.chunk_client.list_generations() + pub async fn list_generations(&self) -> Result { + self.chunk_client.list_generations().await } - pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { - self.chunk_client.fetch_chunk(chunk_id) + pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { + self.chunk_client.fetch_chunk(chunk_id).await } - fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result { - let chunk = self.fetch_chunk(gen_id.as_chunk_id())?; + async fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result { + let chunk = self.fetch_chunk(gen_id.as_chunk_id()).await?; let gen = GenerationChunk::from_data_chunk(&chunk)?; Ok(gen) } - pub fn fetch_generation( + pub async fn fetch_generation( &self, gen_id: &GenId, dbname: &Path, ) -> Result { - let gen = self.fetch_generation_chunk(gen_id)?; + let gen = self.fetch_generation_chunk(gen_id).await?; // Fetch the SQLite file, storing it in the named file. let mut dbfile = File::create(&dbname) .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?; for id in gen.chunk_ids() { - let chunk = self.fetch_chunk(id)?; + let chunk = self.fetch_chunk(id).await?; dbfile .write_all(chunk.data()) .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?; @@ -351,17 +198,17 @@ impl BackupClient { } } -pub struct ChunkClient { - client: Client, +pub struct AsyncChunkClient { + client: reqwest::Client, base_url: String, cipher: CipherEngine, } -impl ChunkClient { +impl AsyncChunkClient { pub fn new(config: &ClientConfig) -> Result { let pass = config.passwords()?; - let client = Client::builder() + let client = reqwest::Client::builder() .danger_accept_invalid_certs(!config.verify_tls_cert) .build() .map_err(ClientError::ReqwestError)?; @@ -380,8 +227,8 @@ impl ChunkClient { format!("{}/chunks", self.base_url()) } - pub fn has_chunk(&self, meta: &ChunkMeta) -> Result, ClientError> { - let body = match self.get("", &[("sha256", meta.sha256())]) { + pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result, ClientError> { + let body = match self.get("", &[("sha256", meta.sha256())]).await { Ok((_, body)) => body, Err(err) => return Err(err), }; @@ -398,7 +245,7 @@ impl ChunkClient { Ok(has) } - pub fn upload_chunk(&self, chunk: DataChunk) -> Result { + pub async fn upload_chunk(&self, chunk: DataChunk) -> Result { let enc = self.cipher.encrypt_chunk(&chunk)?; let res = self .client @@ -406,9 +253,10 @@ impl ChunkClient { .header("chunk-meta", chunk.meta().to_json()) .body(enc.ciphertext().to_vec()) .send() + .await .map_err(ClientError::ReqwestError)?; debug!("upload_chunk: res={:?}", res); - let res: HashMap = res.json().map_err(ClientError::ReqwestError)?; + let res: HashMap = res.json().await.map_err(ClientError::ReqwestError)?; let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { debug!("upload_chunk: id={}", chunk_id); chunk_id.parse().unwrap() @@ -419,8 +267,8 @@ impl ChunkClient { Ok(chunk_id) } - pub fn list_generations(&self) -> Result { - let (_, body) = self.get("", &[("generation", "true")])?; + pub async fn list_generations(&self) -> Result { + let (_, body) = self.get("", &[("generation", "true")]).await?; let map: HashMap = serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?; @@ -432,8 +280,8 @@ impl ChunkClient { Ok(GenerationList::new(finished)) } - pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { - let (headers, body) = self.get(&format!("/{}", chunk_id), &[])?; + pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { + let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?; let meta = self.get_chunk_meta_header(chunk_id, &headers)?; let meta_bytes = meta.to_json_vec(); @@ -442,7 +290,11 @@ impl ChunkClient { Ok(chunk) } - fn get(&self, path: &str, query: &[(&str, &str)]) -> Result<(HeaderMap, Vec), ClientError> { + async fn get( + &self, + path: &str, + query: &[(&str, &str)], + ) -> Result<(HeaderMap, Vec), ClientError> { let url = format!("{}{}", &self.chunks_url(), path); info!("GET {}", url); @@ -458,6 +310,7 @@ impl ChunkClient { let res = self .client .execute(req) + .await .map_err(ClientError::ReqwestError)?; // Did it work? @@ -467,7 +320,7 @@ impl ChunkClient { // Return headers and body. let headers = res.headers().clone(); - let body = res.bytes().map_err(ClientError::ReqwestError)?; + let body = res.bytes().await.map_err(ClientError::ReqwestError)?; let body = body.to_vec(); Ok((headers, body)) } diff --git a/src/cmd/backup.rs b/src/cmd/backup.rs index 04dfb05..dc03ddf 100644 --- a/src/cmd/backup.rs +++ b/src/cmd/backup.rs @@ -1,7 +1,7 @@ use crate::backup_progress::BackupProgress; use crate::backup_run::BackupRun; use crate::chunkid::ChunkId; -use crate::client::BackupClient; +use crate::client::AsyncBackupClient; use crate::config::ClientConfig; use crate::error::ObnamError; use bytesize::MIB; @@ -10,6 +10,7 @@ use std::path::Path; use std::time::SystemTime; use structopt::StructOpt; use tempfile::NamedTempFile; +use tokio::runtime::Runtime; const SQLITE_CHUNK_SIZE: usize = MIB as usize; @@ -18,10 +19,15 @@ pub struct Backup {} impl Backup { pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { let runtime = SystemTime::now(); - let client = BackupClient::new(config)?; - let genlist = client.list_generations()?; + let client = AsyncBackupClient::new(config)?; + let genlist = client.list_generations().await?; let oldtemp = NamedTempFile::new()?; let newtemp = NamedTempFile::new()?; @@ -30,18 +36,18 @@ impl Backup { Err(_) => { info!("fresh backup without a previous generation"); let mut run = BackupRun::initial(config, &client)?; - let old = run.start(None, oldtemp.path())?; - (false, run.backup_roots(config, &old, newtemp.path())?) + let old = run.start(None, oldtemp.path()).await?; + (false, run.backup_roots(config, &old, newtemp.path()).await?) } Ok(old_id) => { info!("incremental backup based on {}", old_id); let mut run = BackupRun::incremental(config, &client)?; - let old = run.start(Some(&old_id), oldtemp.path())?; - (true, run.backup_roots(config, &old, newtemp.path())?) + let old = run.start(Some(&old_id), oldtemp.path()).await?; + (true, run.backup_roots(config, &old, newtemp.path()).await?) } }; - let gen_id = upload_nascent_generation(&client, newtemp.path())?; + let gen_id = upload_nascent_generation(&client, newtemp.path()).await?; for w in outcome.warnings.iter() { println!("warning: {}", w); @@ -84,12 +90,14 @@ fn report_stats( Ok(()) } -fn upload_nascent_generation( - client: &BackupClient, +async fn upload_nascent_generation( + client: &AsyncBackupClient, filename: &Path, ) -> Result { let progress = BackupProgress::upload_generation(); - let gen_id = client.upload_generation(filename, SQLITE_CHUNK_SIZE)?; + let gen_id = client + .upload_generation(filename, SQLITE_CHUNK_SIZE) + .await?; progress.finish(); Ok(gen_id) } -- cgit v1.2.1 From 06755358f8e8192d75be8dc250fe49066a8d75ac Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Mon, 9 Aug 2021 10:33:39 +0300 Subject: refactor: split long func into two Sponsored-by: author --- src/backup_progress.rs | 4 +++ src/backup_run.rs | 93 +++++++++++++++++++++++++++++++++++--------------- 2 files changed, 69 insertions(+), 28 deletions(-) diff --git a/src/backup_progress.rs b/src/backup_progress.rs index 30b6228..741ae7c 100644 --- a/src/backup_progress.rs +++ b/src/backup_progress.rs @@ -80,6 +80,10 @@ impl BackupProgress { self.progress.inc(1); } + pub fn found_problems(&self, n: u64) { + self.progress.inc(n); + } + pub fn found_live_file(&self, filename: &Path) { self.progress.inc(1); if self.progress.length() < self.progress.position() { diff --git a/src/backup_run.rs b/src/backup_run.rs index 39ef217..e3bf4b2 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -131,44 +131,75 @@ impl<'a> BackupRun<'a> { let files_count = { let mut new = NascentGeneration::create(newpath)?; for root in &config.roots { - let iter = FsIterator::new(root, config.exclude_cache_tag_directories); - for entry in iter { - match entry { + match self.backup_one_root(config, old, &mut new, root).await { + Ok(mut o) => { + new_cachedir_tags.append(&mut o.new_cachedir_tags); + if !o.warnings.is_empty() { + warnings.append(&mut o.warnings); + self.found_problems(o.warnings.len() as u64); + } + } + Err(err) => { + debug!("ignoring backup error {}", err); + warnings.push(err.into()); + self.found_problem(); + } + } + } + new.file_count() + }; + self.finish(); + Ok(RootsBackupOutcome { + files_count, + warnings, + new_cachedir_tags, + }) + } + + async fn backup_one_root( + &self, + config: &ClientConfig, + old: &LocalGeneration, + new: &mut NascentGeneration, + root: &Path, + ) -> Result { + let mut warnings: Vec = vec![]; + let mut new_cachedir_tags = vec![]; + let iter = FsIterator::new(root, config.exclude_cache_tag_directories); + for entry in iter { + match entry { + Err(err) => { + debug!("ignoring backup error {}", err); + warnings.push(err.into()); + self.found_problem(); + } + Ok(entry) => { + let path = entry.inner.pathbuf(); + if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? { + new_cachedir_tags.push(path); + } + match self.backup(entry, old).await { Err(err) => { debug!("ignoring backup error {}", err); - warnings.push(err.into()); + warnings.push(err); self.found_problem(); } - Ok(entry) => { - let path = entry.inner.pathbuf(); - if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? { - new_cachedir_tags.push(path); - } - match self.backup(entry, old).await { - Err(err) => { - debug!("ignoring backup error {}", err); - warnings.push(err); - self.found_problem(); - } - Ok(o) => { - if let Err(err) = - new.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag) - { - debug!("ignoring backup error {}", err); - warnings.push(err.into()); - self.found_problem(); - } - } + Ok(o) => { + if let Err(err) = + new.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag) + { + debug!("ignoring backup error {}", err); + warnings.push(err.into()); + self.found_problem(); } } } } } - new.file_count() - }; - self.finish(); + } + Ok(RootsBackupOutcome { - files_count, + files_count: 0, // Caller will get file count from new. warnings, new_cachedir_tags, }) @@ -219,6 +250,12 @@ impl<'a> BackupRun<'a> { progress.found_problem(); } } + + fn found_problems(&self, n: u64) { + if let Some(progress) = &self.progress { + progress.found_problems(n); + } + } } async fn backup_file( -- cgit v1.2.1 From 048c548b9b6395034f75e5f095964ea6c860e953 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Mon, 9 Aug 2021 10:37:47 +0300 Subject: refactor: for simplicity Sponsored-by: author --- src/backup_progress.rs | 4 ---- src/backup_run.rs | 17 ++++------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/src/backup_progress.rs b/src/backup_progress.rs index 741ae7c..30b6228 100644 --- a/src/backup_progress.rs +++ b/src/backup_progress.rs @@ -80,10 +80,6 @@ impl BackupProgress { self.progress.inc(1); } - pub fn found_problems(&self, n: u64) { - self.progress.inc(n); - } - pub fn found_live_file(&self, filename: &Path) { self.progress.inc(1); if self.progress.length() < self.progress.position() { diff --git a/src/backup_run.rs b/src/backup_run.rs index e3bf4b2..7e41173 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -135,8 +135,11 @@ impl<'a> BackupRun<'a> { Ok(mut o) => { new_cachedir_tags.append(&mut o.new_cachedir_tags); if !o.warnings.is_empty() { + for err in o.warnings.iter() { + debug!("ignoring backup error {}", err); + self.found_problem(); + } warnings.append(&mut o.warnings); - self.found_problems(o.warnings.len() as u64); } } Err(err) => { @@ -169,9 +172,7 @@ impl<'a> BackupRun<'a> { for entry in iter { match entry { Err(err) => { - debug!("ignoring backup error {}", err); warnings.push(err.into()); - self.found_problem(); } Ok(entry) => { let path = entry.inner.pathbuf(); @@ -180,17 +181,13 @@ impl<'a> BackupRun<'a> { } match self.backup(entry, old).await { Err(err) => { - debug!("ignoring backup error {}", err); warnings.push(err); - self.found_problem(); } Ok(o) => { if let Err(err) = new.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag) { - debug!("ignoring backup error {}", err); warnings.push(err.into()); - self.found_problem(); } } } @@ -250,12 +247,6 @@ impl<'a> BackupRun<'a> { progress.found_problem(); } } - - fn found_problems(&self, n: u64) { - if let Some(progress) = &self.progress { - progress.found_problems(n); - } - } } async fn backup_file( -- cgit v1.2.1 From c2e9cd7865439ab5f0ec0b9761be6027bee880bb Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Mon, 9 Aug 2021 10:45:04 +0300 Subject: refactor: for clarity Sponsored-by: author --- src/backup_run.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/backup_run.rs b/src/backup_run.rs index 7e41173..f78a83b 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -179,7 +179,7 @@ impl<'a> BackupRun<'a> { if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? { new_cachedir_tags.push(path); } - match self.backup(entry, old).await { + match self.backup_if_changed(entry, old).await { Err(err) => { warnings.push(err); } @@ -202,7 +202,7 @@ impl<'a> BackupRun<'a> { }) } - async fn backup( + async fn backup_if_changed( &self, entry: AnnotatedFsEntry, old: &LocalGeneration, @@ -213,7 +213,7 @@ impl<'a> BackupRun<'a> { let reason = self.policy.needs_backup(old, &entry.inner); match reason { Reason::IsNew | Reason::Changed | Reason::GenerationLookupError | Reason::Unknown => { - Ok(backup_file(self.client, &entry, path, self.buffer_size, reason).await) + Ok(backup_one_entry(self.client, &entry, path, self.buffer_size, reason).await) } Reason::Unchanged | Reason::Skipped | Reason::FileError => { let fileno = old.get_fileno(&entry.inner.pathbuf())?; @@ -249,7 +249,7 @@ impl<'a> BackupRun<'a> { } } -async fn backup_file( +async fn backup_one_entry( client: &AsyncBackupClient, entry: &AnnotatedFsEntry, path: &Path, -- cgit v1.2.1 From 45a3c7109a03ef8466b8e94ef4aefb21de16bd0c Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Mon, 9 Aug 2021 10:51:53 +0300 Subject: refactor for clarity Sponsored-by: author --- src/backup_run.rs | 60 +++++++++++++++++++++++++++---------------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/src/backup_run.rs b/src/backup_run.rs index f78a83b..5aa9e58 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -213,7 +213,7 @@ impl<'a> BackupRun<'a> { let reason = self.policy.needs_backup(old, &entry.inner); match reason { Reason::IsNew | Reason::Changed | Reason::GenerationLookupError | Reason::Unknown => { - Ok(backup_one_entry(self.client, &entry, path, self.buffer_size, reason).await) + Ok(self.backup_one_entry(&entry, path, reason).await) } Reason::Unchanged | Reason::Skipped | Reason::FileError => { let fileno = old.get_fileno(&entry.inner.pathbuf())?; @@ -236,6 +236,35 @@ impl<'a> BackupRun<'a> { } } + async fn backup_one_entry( + &self, + entry: &AnnotatedFsEntry, + path: &Path, + reason: Reason, + ) -> FsEntryBackupOutcome { + let ids = self + .client + .upload_filesystem_entry(&entry.inner, self.buffer_size) + .await; + match ids { + Err(err) => { + warn!("error backing up {}, skipping it: {}", path.display(), err); + FsEntryBackupOutcome { + entry: entry.inner.clone(), + ids: vec![], + reason: Reason::FileError, + is_cachedir_tag: entry.is_cachedir_tag, + } + } + Ok(ids) => FsEntryBackupOutcome { + entry: entry.inner.clone(), + ids, + reason, + is_cachedir_tag: entry.is_cachedir_tag, + }, + } + } + fn found_live_file(&self, path: &Path) { if let Some(progress) = &self.progress { progress.found_live_file(path); @@ -248,32 +277,3 @@ impl<'a> BackupRun<'a> { } } } - -async fn backup_one_entry( - client: &AsyncBackupClient, - entry: &AnnotatedFsEntry, - path: &Path, - chunk_size: usize, - reason: Reason, -) -> FsEntryBackupOutcome { - let ids = client - .upload_filesystem_entry(&entry.inner, chunk_size) - .await; - match ids { - Err(err) => { - warn!("error backing up {}, skipping it: {}", path.display(), err); - FsEntryBackupOutcome { - entry: entry.inner.clone(), - ids: vec![], - reason: Reason::FileError, - is_cachedir_tag: entry.is_cachedir_tag, - } - } - Ok(ids) => FsEntryBackupOutcome { - entry: entry.inner.clone(), - ids, - reason, - is_cachedir_tag: entry.is_cachedir_tag, - }, - } -} -- cgit v1.2.1 From 67757758f0226c31b667a329be6ab25008ef372a Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Mon, 9 Aug 2021 10:52:29 +0300 Subject: refactor: rename function for clarity Sponsored-by: author --- src/backup_run.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backup_run.rs b/src/backup_run.rs index 5aa9e58..02d20ee 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -179,7 +179,7 @@ impl<'a> BackupRun<'a> { if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? { new_cachedir_tags.push(path); } - match self.backup_if_changed(entry, old).await { + match self.backup_if_needed(entry, old).await { Err(err) => { warnings.push(err); } @@ -202,7 +202,7 @@ impl<'a> BackupRun<'a> { }) } - async fn backup_if_changed( + async fn backup_if_needed( &self, entry: AnnotatedFsEntry, old: &LocalGeneration, -- cgit v1.2.1 From 3adfd67831f0e197b8b2e23c79393056584f398e Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Tue, 10 Aug 2021 10:42:32 +0300 Subject: refactor: move file reading, etc, for backups to backup_run Move code to read a file as chunks during a backup, and upload any new chunks to the chunk server, into `src/backup_run.rs`. Previously they were mostly in `src/client.rs`, which is meant to provide an abstraction for using the chunk server API. Sponsored-by: author --- src/backup_run.rs | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++--- src/client.rs | 76 +------------------------------------ src/cmd/backup.rs | 26 ++----------- 3 files changed, 108 insertions(+), 103 deletions(-) diff --git a/src/backup_run.rs b/src/backup_run.rs index 02d20ee..1bb0bc0 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -1,18 +1,25 @@ use crate::backup_progress::BackupProgress; use crate::backup_reason::Reason; +use crate::chunk::{GenerationChunk, GenerationChunkError}; +use crate::chunker::{Chunker, ChunkerError}; use crate::chunkid::ChunkId; use crate::client::{AsyncBackupClient, ClientError}; use crate::config::ClientConfig; use crate::error::ObnamError; -use crate::fsentry::FilesystemEntry; +use crate::fsentry::{FilesystemEntry, FilesystemKind}; use crate::fsiter::{AnnotatedFsEntry, FsIterError, FsIterator}; use crate::generation::{ GenId, LocalGeneration, LocalGenerationError, NascentError, NascentGeneration, }; use crate::policy::BackupPolicy; -use log::{debug, info, warn}; + +use bytesize::MIB; +use chrono::{DateTime, Local}; +use log::{debug, error, info, warn}; use std::path::{Path, PathBuf}; +const SQLITE_CHUNK_SIZE: usize = MIB as usize; + pub struct BackupRun<'a> { client: &'a AsyncBackupClient, policy: BackupPolicy, @@ -33,6 +40,12 @@ pub enum BackupError { #[error(transparent)] LocalGenerationError(#[from] LocalGenerationError), + + #[error(transparent)] + ChunkerError(#[from] ChunkerError), + + #[error(transparent)] + GenerationChunkError(#[from] GenerationChunkError), } #[derive(Debug)] @@ -43,6 +56,13 @@ pub struct FsEntryBackupOutcome { pub is_cachedir_tag: bool, } +#[derive(Debug)] +struct OneRootBackupOutcome { + pub files_count: i64, + pub warnings: Vec, + pub new_cachedir_tags: Vec, +} + #[derive(Debug)] pub struct RootsBackupOutcome { /// The number of backed up files. @@ -51,6 +71,8 @@ pub struct RootsBackupOutcome { pub warnings: Vec, /// CACHEDIR.TAG files that aren't present in in a previous generation. pub new_cachedir_tags: Vec, + /// Id of new generation. + pub gen_id: GenId, } impl<'a> BackupRun<'a> { @@ -125,7 +147,7 @@ impl<'a> BackupRun<'a> { config: &ClientConfig, old: &LocalGeneration, newpath: &Path, - ) -> Result { + ) -> Result { let mut warnings: Vec = vec![]; let mut new_cachedir_tags = vec![]; let files_count = { @@ -152,10 +174,13 @@ impl<'a> BackupRun<'a> { new.file_count() }; self.finish(); + let gen_id = self.upload_nascent_generation(newpath).await?; + let gen_id = GenId::from_chunk_id(gen_id); Ok(RootsBackupOutcome { files_count, warnings, new_cachedir_tags, + gen_id, }) } @@ -165,7 +190,7 @@ impl<'a> BackupRun<'a> { old: &LocalGeneration, new: &mut NascentGeneration, root: &Path, - ) -> Result { + ) -> Result { let mut warnings: Vec = vec![]; let mut new_cachedir_tags = vec![]; let iter = FsIterator::new(root, config.exclude_cache_tag_directories); @@ -195,7 +220,7 @@ impl<'a> BackupRun<'a> { } } - Ok(RootsBackupOutcome { + Ok(OneRootBackupOutcome { files_count: 0, // Caller will get file count from new. warnings, new_cachedir_tags, @@ -243,7 +268,6 @@ impl<'a> BackupRun<'a> { reason: Reason, ) -> FsEntryBackupOutcome { let ids = self - .client .upload_filesystem_entry(&entry.inner, self.buffer_size) .await; match ids { @@ -265,6 +289,74 @@ impl<'a> BackupRun<'a> { } } + pub async fn upload_filesystem_entry( + &self, + e: &FilesystemEntry, + size: usize, + ) -> Result, BackupError> { + let path = e.pathbuf(); + info!("uploading {:?}", path); + let ids = match e.kind() { + FilesystemKind::Regular => self.read_file(&path, size).await?, + FilesystemKind::Directory => vec![], + FilesystemKind::Symlink => vec![], + FilesystemKind::Socket => vec![], + FilesystemKind::Fifo => vec![], + }; + info!("upload OK for {:?}", path); + Ok(ids) + } + + pub async fn upload_generation( + &self, + filename: &Path, + size: usize, + ) -> Result { + info!("upload SQLite {}", filename.display()); + let ids = self.read_file(filename, size).await?; + let gen = GenerationChunk::new(ids); + let data = gen.to_data_chunk(¤t_timestamp())?; + let gen_id = self.client.upload_chunk(data).await?; + info!("uploaded generation {}", gen_id); + Ok(gen_id) + } + + async fn read_file(&self, filename: &Path, size: usize) -> Result, BackupError> { + info!("upload file {}", filename.display()); + let file = std::fs::File::open(filename) + .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?; + let chunker = Chunker::new(size, file, filename); + let chunk_ids = self.upload_new_file_chunks(chunker).await?; + Ok(chunk_ids) + } + + pub async fn upload_new_file_chunks( + &self, + chunker: Chunker, + ) -> Result, BackupError> { + let mut chunk_ids = vec![]; + for item in chunker { + let chunk = item?; + if let Some(chunk_id) = self.client.has_chunk(chunk.meta()).await? { + chunk_ids.push(chunk_id.clone()); + info!("reusing existing chunk {}", chunk_id); + } else { + let chunk_id = self.client.upload_chunk(chunk).await?; + chunk_ids.push(chunk_id.clone()); + info!("created new chunk {}", chunk_id); + } + } + + Ok(chunk_ids) + } + + async fn upload_nascent_generation(&self, filename: &Path) -> Result { + let progress = BackupProgress::upload_generation(); + let gen_id = self.upload_generation(filename, SQLITE_CHUNK_SIZE).await?; + progress.finish(); + Ok(gen_id) + } + fn found_live_file(&self, path: &Path) { if let Some(progress) = &self.progress { progress.found_live_file(path); @@ -277,3 +369,8 @@ impl<'a> BackupRun<'a> { } } } + +fn current_timestamp() -> String { + let now: DateTime = Local::now(); + format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z")) +} diff --git a/src/client.rs b/src/client.rs index 9fddc18..5451dfb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,15 +1,11 @@ -use crate::chunk::DataChunk; -use crate::chunk::{GenerationChunk, GenerationChunkError}; -use crate::chunker::{Chunker, ChunkerError}; +use crate::chunk::{DataChunk, GenerationChunk, GenerationChunkError}; use crate::chunkid::ChunkId; use crate::chunkmeta::ChunkMeta; use crate::cipher::{CipherEngine, CipherError}; use crate::config::{ClientConfig, ClientConfigError}; -use crate::fsentry::{FilesystemEntry, FilesystemKind}; use crate::generation::{FinishedGeneration, GenId, LocalGeneration, LocalGenerationError}; use crate::genlist::GenerationList; -use chrono::{DateTime, Local}; use log::{debug, error, info}; use reqwest::header::HeaderMap; use std::collections::HashMap; @@ -49,9 +45,6 @@ pub enum ClientError { #[error(transparent)] LocalGenerationError(#[from] LocalGenerationError), - #[error(transparent)] - ChunkerError(#[from] ChunkerError), - #[error("couldn't convert response chunk-meta header to string: {0}")] MetaHeaderToString(reqwest::header::ToStrError), @@ -91,48 +84,6 @@ impl AsyncBackupClient { chunk_client: AsyncChunkClient::new(config)?, }) } - - pub async fn upload_filesystem_entry( - &self, - e: &FilesystemEntry, - size: usize, - ) -> Result, ClientError> { - let path = e.pathbuf(); - info!("uploading {:?}", path); - let ids = match e.kind() { - FilesystemKind::Regular => self.read_file(&path, size).await?, - FilesystemKind::Directory => vec![], - FilesystemKind::Symlink => vec![], - FilesystemKind::Socket => vec![], - FilesystemKind::Fifo => vec![], - }; - info!("upload OK for {:?}", path); - Ok(ids) - } - - pub async fn upload_generation( - &self, - filename: &Path, - size: usize, - ) -> Result { - info!("upload SQLite {}", filename.display()); - let ids = self.read_file(filename, size).await?; - let gen = GenerationChunk::new(ids); - let data = gen.to_data_chunk(¤t_timestamp())?; - let gen_id = self.upload_chunk(data).await?; - info!("uploaded generation {}", gen_id); - Ok(gen_id) - } - - async fn read_file(&self, filename: &Path, size: usize) -> Result, ClientError> { - info!("upload file {}", filename.display()); - let file = std::fs::File::open(filename) - .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?; - let chunker = Chunker::new(size, file, filename); - let chunk_ids = self.upload_new_file_chunks(chunker).await?; - Ok(chunk_ids) - } - pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result, ClientError> { self.chunk_client.has_chunk(meta).await } @@ -141,26 +92,6 @@ impl AsyncBackupClient { self.chunk_client.upload_chunk(chunk).await } - pub async fn upload_new_file_chunks( - &self, - chunker: Chunker, - ) -> Result, ClientError> { - let mut chunk_ids = vec![]; - for item in chunker { - let chunk = item?; - if let Some(chunk_id) = self.has_chunk(chunk.meta()).await? { - chunk_ids.push(chunk_id.clone()); - info!("reusing existing chunk {}", chunk_id); - } else { - let chunk_id = self.upload_chunk(chunk).await?; - chunk_ids.push(chunk_id.clone()); - info!("created new chunk {}", chunk_id); - } - } - - Ok(chunk_ids) - } - pub async fn list_generations(&self) -> Result { self.chunk_client.list_generations().await } @@ -347,8 +278,3 @@ impl AsyncChunkClient { Ok(meta) } } - -fn current_timestamp() -> String { - let now: DateTime = Local::now(); - format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z")) -} diff --git a/src/cmd/backup.rs b/src/cmd/backup.rs index dc03ddf..8f3d6d5 100644 --- a/src/cmd/backup.rs +++ b/src/cmd/backup.rs @@ -1,19 +1,15 @@ -use crate::backup_progress::BackupProgress; use crate::backup_run::BackupRun; -use crate::chunkid::ChunkId; use crate::client::AsyncBackupClient; use crate::config::ClientConfig; use crate::error::ObnamError; -use bytesize::MIB; +use crate::generation::GenId; + use log::info; -use std::path::Path; use std::time::SystemTime; use structopt::StructOpt; use tempfile::NamedTempFile; use tokio::runtime::Runtime; -const SQLITE_CHUNK_SIZE: usize = MIB as usize; - #[derive(Debug, StructOpt)] pub struct Backup {} @@ -47,8 +43,6 @@ impl Backup { } }; - let gen_id = upload_nascent_generation(&client, newtemp.path()).await?; - for w in outcome.warnings.iter() { println!("warning: {}", w); } @@ -64,7 +58,7 @@ impl Backup { report_stats( &runtime, outcome.files_count, - &gen_id, + &outcome.gen_id, outcome.warnings.len(), )?; @@ -79,7 +73,7 @@ impl Backup { fn report_stats( runtime: &SystemTime, file_count: i64, - gen_id: &ChunkId, + gen_id: &GenId, num_warnings: usize, ) -> Result<(), ObnamError> { println!("status: OK"); @@ -89,15 +83,3 @@ fn report_stats( println!("generation-id: {}", gen_id); Ok(()) } - -async fn upload_nascent_generation( - client: &AsyncBackupClient, - filename: &Path, -) -> Result { - let progress = BackupProgress::upload_generation(); - let gen_id = client - .upload_generation(filename, SQLITE_CHUNK_SIZE) - .await?; - progress.finish(); - Ok(gen_id) -} -- cgit v1.2.1 From 6bc136082f5fc02f4f2ebdcc0facbf666ee7b01c Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Tue, 10 Aug 2021 10:53:11 +0300 Subject: refactor: rename function to have a clearer name Rename `read_file` to `upload_regular_file` to better describe the purpose of the function. Sponsored-by: author --- src/backup_run.rs | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/src/backup_run.rs b/src/backup_run.rs index 1bb0bc0..b536f6e 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -297,7 +297,7 @@ impl<'a> BackupRun<'a> { let path = e.pathbuf(); info!("uploading {:?}", path); let ids = match e.kind() { - FilesystemKind::Regular => self.read_file(&path, size).await?, + FilesystemKind::Regular => self.upload_regular_file(&path, size).await?, FilesystemKind::Directory => vec![], FilesystemKind::Symlink => vec![], FilesystemKind::Socket => vec![], @@ -313,7 +313,7 @@ impl<'a> BackupRun<'a> { size: usize, ) -> Result { info!("upload SQLite {}", filename.display()); - let ids = self.read_file(filename, size).await?; + let ids = self.upload_regular_file(filename, size).await?; let gen = GenerationChunk::new(ids); let data = gen.to_data_chunk(¤t_timestamp())?; let gen_id = self.client.upload_chunk(data).await?; @@ -321,20 +321,16 @@ impl<'a> BackupRun<'a> { Ok(gen_id) } - async fn read_file(&self, filename: &Path, size: usize) -> Result, BackupError> { + async fn upload_regular_file( + &self, + filename: &Path, + size: usize, + ) -> Result, BackupError> { info!("upload file {}", filename.display()); + let mut chunk_ids = vec![]; let file = std::fs::File::open(filename) .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?; let chunker = Chunker::new(size, file, filename); - let chunk_ids = self.upload_new_file_chunks(chunker).await?; - Ok(chunk_ids) - } - - pub async fn upload_new_file_chunks( - &self, - chunker: Chunker, - ) -> Result, BackupError> { - let mut chunk_ids = vec![]; for item in chunker { let chunk = item?; if let Some(chunk_id) = self.client.has_chunk(chunk.meta()).await? { @@ -346,7 +342,6 @@ impl<'a> BackupRun<'a> { info!("created new chunk {}", chunk_id); } } - Ok(chunk_ids) } -- cgit v1.2.1