From 6e9a1f5f45a5608a05931fd4d579c8ab52316f55 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sun, 8 Aug 2021 15:03:31 +0300 Subject: refactor: use async for "obnam backup" This changes things so that "obnam backup" uses async for everything. The old non-async BackupClient and ChunkClient are dropped. This does NOT move the chunking, and checksum computation, to its own function. This will happen later. Sponsored-by: author --- src/backup_run.rs | 41 +++++---- src/client.rs | 247 +++++++++++------------------------------------------- src/cmd/backup.rs | 30 ++++--- 3 files changed, 89 insertions(+), 229 deletions(-) (limited to 'src') diff --git a/src/backup_run.rs b/src/backup_run.rs index cd9c10f..39ef217 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -1,7 +1,7 @@ use crate::backup_progress::BackupProgress; use crate::backup_reason::Reason; use crate::chunkid::ChunkId; -use crate::client::{BackupClient, ClientError}; +use crate::client::{AsyncBackupClient, ClientError}; use crate::config::ClientConfig; use crate::error::ObnamError; use crate::fsentry::FilesystemEntry; @@ -14,7 +14,7 @@ use log::{debug, info, warn}; use std::path::{Path, PathBuf}; pub struct BackupRun<'a> { - client: &'a BackupClient, + client: &'a AsyncBackupClient, policy: BackupPolicy, buffer_size: usize, progress: Option, @@ -54,7 +54,10 @@ pub struct RootsBackupOutcome { } impl<'a> BackupRun<'a> { - pub fn initial(config: &ClientConfig, client: &'a BackupClient) -> Result { + pub fn initial( + config: &ClientConfig, + client: &'a AsyncBackupClient, + ) -> Result { Ok(Self { client, policy: BackupPolicy::default(), @@ -65,7 +68,7 @@ impl<'a> BackupRun<'a> { pub fn incremental( config: &ClientConfig, - client: &'a BackupClient, + client: &'a AsyncBackupClient, ) -> Result { Ok(Self { client, @@ -75,7 +78,7 @@ impl<'a> BackupRun<'a> { }) } - pub fn start( + pub async fn start( &mut self, genid: Option<&GenId>, oldname: &Path, @@ -89,7 +92,7 @@ impl<'a> BackupRun<'a> { Ok(LocalGeneration::open(oldname)?) } Some(genid) => { - let old = self.fetch_previous_generation(genid, oldname)?; + let old = self.fetch_previous_generation(genid, oldname).await?; let progress = BackupProgress::incremental(); progress.files_in_previous_generation(old.file_count()? as u64); @@ -100,13 +103,13 @@ impl<'a> BackupRun<'a> { } } - fn fetch_previous_generation( + async fn fetch_previous_generation( &self, genid: &GenId, oldname: &Path, ) -> Result { let progress = BackupProgress::download_generation(genid); - let old = self.client.fetch_generation(genid, oldname)?; + let old = self.client.fetch_generation(genid, oldname).await?; progress.finish(); Ok(old) } @@ -117,7 +120,7 @@ impl<'a> BackupRun<'a> { } } - pub fn backup_roots( + pub async fn backup_roots( &self, config: &ClientConfig, old: &LocalGeneration, @@ -141,7 +144,7 @@ impl<'a> BackupRun<'a> { if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? { new_cachedir_tags.push(path); } - match self.backup(entry, old) { + match self.backup(entry, old).await { Err(err) => { debug!("ignoring backup error {}", err); warnings.push(err); @@ -171,7 +174,7 @@ impl<'a> BackupRun<'a> { }) } - fn backup( + async fn backup( &self, entry: AnnotatedFsEntry, old: &LocalGeneration, @@ -182,13 +185,7 @@ impl<'a> BackupRun<'a> { let reason = self.policy.needs_backup(old, &entry.inner); match reason { Reason::IsNew | Reason::Changed | Reason::GenerationLookupError | Reason::Unknown => { - Ok(backup_file( - self.client, - &entry, - path, - self.buffer_size, - reason, - )) + Ok(backup_file(self.client, &entry, path, self.buffer_size, reason).await) } Reason::Unchanged | Reason::Skipped | Reason::FileError => { let fileno = old.get_fileno(&entry.inner.pathbuf())?; @@ -224,14 +221,16 @@ impl<'a> BackupRun<'a> { } } -fn backup_file( - client: &BackupClient, +async fn backup_file( + client: &AsyncBackupClient, entry: &AnnotatedFsEntry, path: &Path, chunk_size: usize, reason: Reason, ) -> FsEntryBackupOutcome { - let ids = client.upload_filesystem_entry(&entry.inner, chunk_size); + let ids = client + .upload_filesystem_entry(&entry.inner, chunk_size) + .await; match ids { Err(err) => { warn!("error backing up {}, skipping it: {}", path.display(), err); diff --git a/src/client.rs b/src/client.rs index c655bb2..9fddc18 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,7 +11,6 @@ use crate::genlist::GenerationList; use chrono::{DateTime, Local}; use log::{debug, error, info}; -use reqwest::blocking::Client; use reqwest::header::HeaderMap; use std::collections::HashMap; use std::fs::File; @@ -93,166 +92,7 @@ impl AsyncBackupClient { }) } - pub async fn list_generations(&self) -> Result { - self.chunk_client.list_generations().await - } - - pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { - self.chunk_client.fetch_chunk(chunk_id).await - } - - async fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result { - let chunk = self.fetch_chunk(gen_id.as_chunk_id()).await?; - let gen = GenerationChunk::from_data_chunk(&chunk)?; - Ok(gen) - } - - pub async fn fetch_generation( - &self, - gen_id: &GenId, - dbname: &Path, - ) -> Result { - let gen = self.fetch_generation_chunk(gen_id).await?; - - // Fetch the SQLite file, storing it in the named file. - let mut dbfile = File::create(&dbname) - .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?; - for id in gen.chunk_ids() { - let chunk = self.fetch_chunk(id).await?; - dbfile - .write_all(chunk.data()) - .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?; - } - info!("downloaded generation to {}", dbname.display()); - - let gen = LocalGeneration::open(dbname)?; - Ok(gen) - } -} - -pub struct AsyncChunkClient { - client: reqwest::Client, - base_url: String, - cipher: CipherEngine, -} - -impl AsyncChunkClient { - pub fn new(config: &ClientConfig) -> Result { - let pass = config.passwords()?; - - let client = reqwest::Client::builder() - .danger_accept_invalid_certs(!config.verify_tls_cert) - .build() - .map_err(ClientError::ReqwestError)?; - Ok(Self { - client, - base_url: config.server_url.to_string(), - cipher: CipherEngine::new(&pass), - }) - } - - fn base_url(&self) -> &str { - &self.base_url - } - - fn chunks_url(&self) -> String { - format!("{}/chunks", self.base_url()) - } - - pub async fn list_generations(&self) -> Result { - let (_, body) = self.get("", &[("generation", "true")]).await?; - - let map: HashMap = - serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?; - debug!("list_generations: map={:?}", map); - let finished = map - .iter() - .map(|(id, meta)| FinishedGeneration::new(id, meta.ended().map_or("", |s| s))) - .collect(); - Ok(GenerationList::new(finished)) - } - - pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { - let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?; - let meta = self.get_chunk_meta_header(chunk_id, &headers)?; - - let meta_bytes = meta.to_json_vec(); - let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?; - - Ok(chunk) - } - - async fn get( - &self, - path: &str, - query: &[(&str, &str)], - ) -> Result<(HeaderMap, Vec), ClientError> { - let url = format!("{}{}", &self.chunks_url(), path); - info!("GET {}", url); - - // Build HTTP request structure. - let req = self - .client - .get(&url) - .query(query) - .build() - .map_err(ClientError::ReqwestError)?; - - // Make HTTP request. - let res = self - .client - .execute(req) - .await - .map_err(ClientError::ReqwestError)?; - - // Did it work? - if res.status() != 200 { - return Err(ClientError::NotFound(path.to_string())); - } - - // Return headers and body. - let headers = res.headers().clone(); - let body = res.bytes().await.map_err(ClientError::ReqwestError)?; - let body = body.to_vec(); - Ok((headers, body)) - } - - fn get_chunk_meta_header( - &self, - chunk_id: &ChunkId, - headers: &HeaderMap, - ) -> Result { - let meta = headers.get("chunk-meta"); - - if meta.is_none() { - let err = ClientError::NoChunkMeta(chunk_id.clone()); - error!("fetching chunk {} failed: {}", chunk_id, err); - return Err(err); - } - - let meta = meta - .unwrap() - .to_str() - .map_err(ClientError::MetaHeaderToString)?; - let meta: ChunkMeta = serde_json::from_str(meta).map_err(ClientError::JsonParse)?; - - Ok(meta) - } -} - -pub struct BackupClient { - chunk_client: ChunkClient, -} - -impl BackupClient { - pub fn new(config: &ClientConfig) -> Result { - info!("creating backup client with config: {:#?}", config); - Ok(Self { - chunk_client: ChunkClient::new(config)?, - }) - } - - pub fn upload_filesystem_entry( + pub async fn upload_filesystem_entry( &self, e: &FilesystemEntry, size: usize, @@ -260,7 +100,7 @@ impl BackupClient { let path = e.pathbuf(); info!("uploading {:?}", path); let ids = match e.kind() { - FilesystemKind::Regular => self.read_file(&path, size)?, + FilesystemKind::Regular => self.read_file(&path, size).await?, FilesystemKind::Directory => vec![], FilesystemKind::Symlink => vec![], FilesystemKind::Socket => vec![], @@ -270,42 +110,49 @@ impl BackupClient { Ok(ids) } - pub fn upload_generation(&self, filename: &Path, size: usize) -> Result { + pub async fn upload_generation( + &self, + filename: &Path, + size: usize, + ) -> Result { info!("upload SQLite {}", filename.display()); - let ids = self.read_file(filename, size)?; + let ids = self.read_file(filename, size).await?; let gen = GenerationChunk::new(ids); let data = gen.to_data_chunk(¤t_timestamp())?; - let gen_id = self.upload_chunk(data)?; + let gen_id = self.upload_chunk(data).await?; info!("uploaded generation {}", gen_id); Ok(gen_id) } - fn read_file(&self, filename: &Path, size: usize) -> Result, ClientError> { + async fn read_file(&self, filename: &Path, size: usize) -> Result, ClientError> { info!("upload file {}", filename.display()); let file = std::fs::File::open(filename) .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?; let chunker = Chunker::new(size, file, filename); - let chunk_ids = self.upload_new_file_chunks(chunker)?; + let chunk_ids = self.upload_new_file_chunks(chunker).await?; Ok(chunk_ids) } - pub fn has_chunk(&self, meta: &ChunkMeta) -> Result, ClientError> { - self.chunk_client.has_chunk(meta) + pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result, ClientError> { + self.chunk_client.has_chunk(meta).await } - pub fn upload_chunk(&self, chunk: DataChunk) -> Result { - self.chunk_client.upload_chunk(chunk) + pub async fn upload_chunk(&self, chunk: DataChunk) -> Result { + self.chunk_client.upload_chunk(chunk).await } - pub fn upload_new_file_chunks(&self, chunker: Chunker) -> Result, ClientError> { + pub async fn upload_new_file_chunks( + &self, + chunker: Chunker, + ) -> Result, ClientError> { let mut chunk_ids = vec![]; for item in chunker { let chunk = item?; - if let Some(chunk_id) = self.has_chunk(chunk.meta())? { + if let Some(chunk_id) = self.has_chunk(chunk.meta()).await? { chunk_ids.push(chunk_id.clone()); info!("reusing existing chunk {}", chunk_id); } else { - let chunk_id = self.upload_chunk(chunk)?; + let chunk_id = self.upload_chunk(chunk).await?; chunk_ids.push(chunk_id.clone()); info!("created new chunk {}", chunk_id); } @@ -314,32 +161,32 @@ impl BackupClient { Ok(chunk_ids) } - pub fn list_generations(&self) -> Result { - self.chunk_client.list_generations() + pub async fn list_generations(&self) -> Result { + self.chunk_client.list_generations().await } - pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { - self.chunk_client.fetch_chunk(chunk_id) + pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { + self.chunk_client.fetch_chunk(chunk_id).await } - fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result { - let chunk = self.fetch_chunk(gen_id.as_chunk_id())?; + async fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result { + let chunk = self.fetch_chunk(gen_id.as_chunk_id()).await?; let gen = GenerationChunk::from_data_chunk(&chunk)?; Ok(gen) } - pub fn fetch_generation( + pub async fn fetch_generation( &self, gen_id: &GenId, dbname: &Path, ) -> Result { - let gen = self.fetch_generation_chunk(gen_id)?; + let gen = self.fetch_generation_chunk(gen_id).await?; // Fetch the SQLite file, storing it in the named file. let mut dbfile = File::create(&dbname) .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?; for id in gen.chunk_ids() { - let chunk = self.fetch_chunk(id)?; + let chunk = self.fetch_chunk(id).await?; dbfile .write_all(chunk.data()) .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?; @@ -351,17 +198,17 @@ impl BackupClient { } } -pub struct ChunkClient { - client: Client, +pub struct AsyncChunkClient { + client: reqwest::Client, base_url: String, cipher: CipherEngine, } -impl ChunkClient { +impl AsyncChunkClient { pub fn new(config: &ClientConfig) -> Result { let pass = config.passwords()?; - let client = Client::builder() + let client = reqwest::Client::builder() .danger_accept_invalid_certs(!config.verify_tls_cert) .build() .map_err(ClientError::ReqwestError)?; @@ -380,8 +227,8 @@ impl ChunkClient { format!("{}/chunks", self.base_url()) } - pub fn has_chunk(&self, meta: &ChunkMeta) -> Result, ClientError> { - let body = match self.get("", &[("sha256", meta.sha256())]) { + pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result, ClientError> { + let body = match self.get("", &[("sha256", meta.sha256())]).await { Ok((_, body)) => body, Err(err) => return Err(err), }; @@ -398,7 +245,7 @@ impl ChunkClient { Ok(has) } - pub fn upload_chunk(&self, chunk: DataChunk) -> Result { + pub async fn upload_chunk(&self, chunk: DataChunk) -> Result { let enc = self.cipher.encrypt_chunk(&chunk)?; let res = self .client @@ -406,9 +253,10 @@ impl ChunkClient { .header("chunk-meta", chunk.meta().to_json()) .body(enc.ciphertext().to_vec()) .send() + .await .map_err(ClientError::ReqwestError)?; debug!("upload_chunk: res={:?}", res); - let res: HashMap = res.json().map_err(ClientError::ReqwestError)?; + let res: HashMap = res.json().await.map_err(ClientError::ReqwestError)?; let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { debug!("upload_chunk: id={}", chunk_id); chunk_id.parse().unwrap() @@ -419,8 +267,8 @@ impl ChunkClient { Ok(chunk_id) } - pub fn list_generations(&self) -> Result { - let (_, body) = self.get("", &[("generation", "true")])?; + pub async fn list_generations(&self) -> Result { + let (_, body) = self.get("", &[("generation", "true")]).await?; let map: HashMap = serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?; @@ -432,8 +280,8 @@ impl ChunkClient { Ok(GenerationList::new(finished)) } - pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { - let (headers, body) = self.get(&format!("/{}", chunk_id), &[])?; + pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { + let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?; let meta = self.get_chunk_meta_header(chunk_id, &headers)?; let meta_bytes = meta.to_json_vec(); @@ -442,7 +290,11 @@ impl ChunkClient { Ok(chunk) } - fn get(&self, path: &str, query: &[(&str, &str)]) -> Result<(HeaderMap, Vec), ClientError> { + async fn get( + &self, + path: &str, + query: &[(&str, &str)], + ) -> Result<(HeaderMap, Vec), ClientError> { let url = format!("{}{}", &self.chunks_url(), path); info!("GET {}", url); @@ -458,6 +310,7 @@ impl ChunkClient { let res = self .client .execute(req) + .await .map_err(ClientError::ReqwestError)?; // Did it work? @@ -467,7 +320,7 @@ impl ChunkClient { // Return headers and body. let headers = res.headers().clone(); - let body = res.bytes().map_err(ClientError::ReqwestError)?; + let body = res.bytes().await.map_err(ClientError::ReqwestError)?; let body = body.to_vec(); Ok((headers, body)) } diff --git a/src/cmd/backup.rs b/src/cmd/backup.rs index 04dfb05..dc03ddf 100644 --- a/src/cmd/backup.rs +++ b/src/cmd/backup.rs @@ -1,7 +1,7 @@ use crate::backup_progress::BackupProgress; use crate::backup_run::BackupRun; use crate::chunkid::ChunkId; -use crate::client::BackupClient; +use crate::client::AsyncBackupClient; use crate::config::ClientConfig; use crate::error::ObnamError; use bytesize::MIB; @@ -10,6 +10,7 @@ use std::path::Path; use std::time::SystemTime; use structopt::StructOpt; use tempfile::NamedTempFile; +use tokio::runtime::Runtime; const SQLITE_CHUNK_SIZE: usize = MIB as usize; @@ -18,10 +19,15 @@ pub struct Backup {} impl Backup { pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { let runtime = SystemTime::now(); - let client = BackupClient::new(config)?; - let genlist = client.list_generations()?; + let client = AsyncBackupClient::new(config)?; + let genlist = client.list_generations().await?; let oldtemp = NamedTempFile::new()?; let newtemp = NamedTempFile::new()?; @@ -30,18 +36,18 @@ impl Backup { Err(_) => { info!("fresh backup without a previous generation"); let mut run = BackupRun::initial(config, &client)?; - let old = run.start(None, oldtemp.path())?; - (false, run.backup_roots(config, &old, newtemp.path())?) + let old = run.start(None, oldtemp.path()).await?; + (false, run.backup_roots(config, &old, newtemp.path()).await?) } Ok(old_id) => { info!("incremental backup based on {}", old_id); let mut run = BackupRun::incremental(config, &client)?; - let old = run.start(Some(&old_id), oldtemp.path())?; - (true, run.backup_roots(config, &old, newtemp.path())?) + let old = run.start(Some(&old_id), oldtemp.path()).await?; + (true, run.backup_roots(config, &old, newtemp.path()).await?) } }; - let gen_id = upload_nascent_generation(&client, newtemp.path())?; + let gen_id = upload_nascent_generation(&client, newtemp.path()).await?; for w in outcome.warnings.iter() { println!("warning: {}", w); @@ -84,12 +90,14 @@ fn report_stats( Ok(()) } -fn upload_nascent_generation( - client: &BackupClient, +async fn upload_nascent_generation( + client: &AsyncBackupClient, filename: &Path, ) -> Result { let progress = BackupProgress::upload_generation(); - let gen_id = client.upload_generation(filename, SQLITE_CHUNK_SIZE)?; + let gen_id = client + .upload_generation(filename, SQLITE_CHUNK_SIZE) + .await?; progress.finish(); Ok(gen_id) } -- cgit v1.2.1