diff options
author | Lars Wirzenius <liw@liw.fi> | 2021-08-08 15:03:31 +0300 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2021-08-09 09:58:38 +0300 |
commit | 6e9a1f5f45a5608a05931fd4d579c8ab52316f55 (patch) | |
tree | c33ff750c17eb6ca7c7d6771c885da2f31c40cd3 /src/client.rs | |
parent | 30c20dcf3d81e9147e2bd4617bd055a5a8a17896 (diff) | |
download | obnam2-6e9a1f5f45a5608a05931fd4d579c8ab52316f55.tar.gz |
refactor: use async for "obnam backup"
This changes things so that "obnam backup" uses async for everything.
The old non-async BackupClient and ChunkClient are dropped.
This does NOT move the chunking, and checksum computation, to its own
function. This will happen later.
Sponsored-by: author
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 247 |
1 files changed, 50 insertions, 197 deletions
diff --git a/src/client.rs b/src/client.rs index c655bb2..9fddc18 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,7 +11,6 @@ use crate::genlist::GenerationList; use chrono::{DateTime, Local}; use log::{debug, error, info}; -use reqwest::blocking::Client; use reqwest::header::HeaderMap; use std::collections::HashMap; use std::fs::File; @@ -93,166 +92,7 @@ impl AsyncBackupClient { }) } - pub async fn list_generations(&self) -> Result<GenerationList, ClientError> { - self.chunk_client.list_generations().await - } - - pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { - self.chunk_client.fetch_chunk(chunk_id).await - } - - async fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result<GenerationChunk, ClientError> { - let chunk = self.fetch_chunk(gen_id.as_chunk_id()).await?; - let gen = GenerationChunk::from_data_chunk(&chunk)?; - Ok(gen) - } - - pub async fn fetch_generation( - &self, - gen_id: &GenId, - dbname: &Path, - ) -> Result<LocalGeneration, ClientError> { - let gen = self.fetch_generation_chunk(gen_id).await?; - - // Fetch the SQLite file, storing it in the named file. - let mut dbfile = File::create(&dbname) - .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?; - for id in gen.chunk_ids() { - let chunk = self.fetch_chunk(id).await?; - dbfile - .write_all(chunk.data()) - .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?; - } - info!("downloaded generation to {}", dbname.display()); - - let gen = LocalGeneration::open(dbname)?; - Ok(gen) - } -} - -pub struct AsyncChunkClient { - client: reqwest::Client, - base_url: String, - cipher: CipherEngine, -} - -impl AsyncChunkClient { - pub fn new(config: &ClientConfig) -> Result<Self, ClientError> { - let pass = config.passwords()?; - - let client = reqwest::Client::builder() - .danger_accept_invalid_certs(!config.verify_tls_cert) - .build() - .map_err(ClientError::ReqwestError)?; - Ok(Self { - client, - base_url: config.server_url.to_string(), - cipher: CipherEngine::new(&pass), - }) - } - - fn base_url(&self) -> &str { - &self.base_url - } - - fn chunks_url(&self) -> String { - format!("{}/chunks", self.base_url()) - } - - pub async fn list_generations(&self) -> Result<GenerationList, ClientError> { - let (_, body) = self.get("", &[("generation", "true")]).await?; - - let map: HashMap<String, ChunkMeta> = - serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?; - debug!("list_generations: map={:?}", map); - let finished = map - .iter() - .map(|(id, meta)| FinishedGeneration::new(id, meta.ended().map_or("", |s| s))) - .collect(); - Ok(GenerationList::new(finished)) - } - - pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { - let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?; - let meta = self.get_chunk_meta_header(chunk_id, &headers)?; - - let meta_bytes = meta.to_json_vec(); - let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?; - - Ok(chunk) - } - - async fn get( - &self, - path: &str, - query: &[(&str, &str)], - ) -> Result<(HeaderMap, Vec<u8>), ClientError> { - let url = format!("{}{}", &self.chunks_url(), path); - info!("GET {}", url); - - // Build HTTP request structure. - let req = self - .client - .get(&url) - .query(query) - .build() - .map_err(ClientError::ReqwestError)?; - - // Make HTTP request. - let res = self - .client - .execute(req) - .await - .map_err(ClientError::ReqwestError)?; - - // Did it work? - if res.status() != 200 { - return Err(ClientError::NotFound(path.to_string())); - } - - // Return headers and body. - let headers = res.headers().clone(); - let body = res.bytes().await.map_err(ClientError::ReqwestError)?; - let body = body.to_vec(); - Ok((headers, body)) - } - - fn get_chunk_meta_header( - &self, - chunk_id: &ChunkId, - headers: &HeaderMap, - ) -> Result<ChunkMeta, ClientError> { - let meta = headers.get("chunk-meta"); - - if meta.is_none() { - let err = ClientError::NoChunkMeta(chunk_id.clone()); - error!("fetching chunk {} failed: {}", chunk_id, err); - return Err(err); - } - - let meta = meta - .unwrap() - .to_str() - .map_err(ClientError::MetaHeaderToString)?; - let meta: ChunkMeta = serde_json::from_str(meta).map_err(ClientError::JsonParse)?; - - Ok(meta) - } -} - -pub struct BackupClient { - chunk_client: ChunkClient, -} - -impl BackupClient { - pub fn new(config: &ClientConfig) -> Result<Self, ClientError> { - info!("creating backup client with config: {:#?}", config); - Ok(Self { - chunk_client: ChunkClient::new(config)?, - }) - } - - pub fn upload_filesystem_entry( + pub async fn upload_filesystem_entry( &self, e: &FilesystemEntry, size: usize, @@ -260,7 +100,7 @@ impl BackupClient { let path = e.pathbuf(); info!("uploading {:?}", path); let ids = match e.kind() { - FilesystemKind::Regular => self.read_file(&path, size)?, + FilesystemKind::Regular => self.read_file(&path, size).await?, FilesystemKind::Directory => vec![], FilesystemKind::Symlink => vec![], FilesystemKind::Socket => vec![], @@ -270,42 +110,49 @@ impl BackupClient { Ok(ids) } - pub fn upload_generation(&self, filename: &Path, size: usize) -> Result<ChunkId, ClientError> { + pub async fn upload_generation( + &self, + filename: &Path, + size: usize, + ) -> Result<ChunkId, ClientError> { info!("upload SQLite {}", filename.display()); - let ids = self.read_file(filename, size)?; + let ids = self.read_file(filename, size).await?; let gen = GenerationChunk::new(ids); let data = gen.to_data_chunk(¤t_timestamp())?; - let gen_id = self.upload_chunk(data)?; + let gen_id = self.upload_chunk(data).await?; info!("uploaded generation {}", gen_id); Ok(gen_id) } - fn read_file(&self, filename: &Path, size: usize) -> Result<Vec<ChunkId>, ClientError> { + async fn read_file(&self, filename: &Path, size: usize) -> Result<Vec<ChunkId>, ClientError> { info!("upload file {}", filename.display()); let file = std::fs::File::open(filename) .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?; let chunker = Chunker::new(size, file, filename); - let chunk_ids = self.upload_new_file_chunks(chunker)?; + let chunk_ids = self.upload_new_file_chunks(chunker).await?; Ok(chunk_ids) } - pub fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> { - self.chunk_client.has_chunk(meta) + pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> { + self.chunk_client.has_chunk(meta).await } - pub fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> { - self.chunk_client.upload_chunk(chunk) + pub async fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> { + self.chunk_client.upload_chunk(chunk).await } - pub fn upload_new_file_chunks(&self, chunker: Chunker) -> Result<Vec<ChunkId>, ClientError> { + pub async fn upload_new_file_chunks( + &self, + chunker: Chunker, + ) -> Result<Vec<ChunkId>, ClientError> { let mut chunk_ids = vec![]; for item in chunker { let chunk = item?; - if let Some(chunk_id) = self.has_chunk(chunk.meta())? { + if let Some(chunk_id) = self.has_chunk(chunk.meta()).await? { chunk_ids.push(chunk_id.clone()); info!("reusing existing chunk {}", chunk_id); } else { - let chunk_id = self.upload_chunk(chunk)?; + let chunk_id = self.upload_chunk(chunk).await?; chunk_ids.push(chunk_id.clone()); info!("created new chunk {}", chunk_id); } @@ -314,32 +161,32 @@ impl BackupClient { Ok(chunk_ids) } - pub fn list_generations(&self) -> Result<GenerationList, ClientError> { - self.chunk_client.list_generations() + pub async fn list_generations(&self) -> Result<GenerationList, ClientError> { + self.chunk_client.list_generations().await } - pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { - self.chunk_client.fetch_chunk(chunk_id) + pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { + self.chunk_client.fetch_chunk(chunk_id).await } - fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result<GenerationChunk, ClientError> { - let chunk = self.fetch_chunk(gen_id.as_chunk_id())?; + async fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result<GenerationChunk, ClientError> { + let chunk = self.fetch_chunk(gen_id.as_chunk_id()).await?; let gen = GenerationChunk::from_data_chunk(&chunk)?; Ok(gen) } - pub fn fetch_generation( + pub async fn fetch_generation( &self, gen_id: &GenId, dbname: &Path, ) -> Result<LocalGeneration, ClientError> { - let gen = self.fetch_generation_chunk(gen_id)?; + let gen = self.fetch_generation_chunk(gen_id).await?; // Fetch the SQLite file, storing it in the named file. let mut dbfile = File::create(&dbname) .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?; for id in gen.chunk_ids() { - let chunk = self.fetch_chunk(id)?; + let chunk = self.fetch_chunk(id).await?; dbfile .write_all(chunk.data()) .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?; @@ -351,17 +198,17 @@ impl BackupClient { } } -pub struct ChunkClient { - client: Client, +pub struct AsyncChunkClient { + client: reqwest::Client, base_url: String, cipher: CipherEngine, } -impl ChunkClient { +impl AsyncChunkClient { pub fn new(config: &ClientConfig) -> Result<Self, ClientError> { let pass = config.passwords()?; - let client = Client::builder() + let client = reqwest::Client::builder() .danger_accept_invalid_certs(!config.verify_tls_cert) .build() .map_err(ClientError::ReqwestError)?; @@ -380,8 +227,8 @@ impl ChunkClient { format!("{}/chunks", self.base_url()) } - pub fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> { - let body = match self.get("", &[("sha256", meta.sha256())]) { + pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> { + let body = match self.get("", &[("sha256", meta.sha256())]).await { Ok((_, body)) => body, Err(err) => return Err(err), }; @@ -398,7 +245,7 @@ impl ChunkClient { Ok(has) } - pub fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> { + pub async fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> { let enc = self.cipher.encrypt_chunk(&chunk)?; let res = self .client @@ -406,9 +253,10 @@ impl ChunkClient { .header("chunk-meta", chunk.meta().to_json()) .body(enc.ciphertext().to_vec()) .send() + .await .map_err(ClientError::ReqwestError)?; debug!("upload_chunk: res={:?}", res); - let res: HashMap<String, String> = res.json().map_err(ClientError::ReqwestError)?; + let res: HashMap<String, String> = res.json().await.map_err(ClientError::ReqwestError)?; let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { debug!("upload_chunk: id={}", chunk_id); chunk_id.parse().unwrap() @@ -419,8 +267,8 @@ impl ChunkClient { Ok(chunk_id) } - pub fn list_generations(&self) -> Result<GenerationList, ClientError> { - let (_, body) = self.get("", &[("generation", "true")])?; + pub async fn list_generations(&self) -> Result<GenerationList, ClientError> { + let (_, body) = self.get("", &[("generation", "true")]).await?; let map: HashMap<String, ChunkMeta> = serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?; @@ -432,8 +280,8 @@ impl ChunkClient { Ok(GenerationList::new(finished)) } - pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { - let (headers, body) = self.get(&format!("/{}", chunk_id), &[])?; + pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { + let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?; let meta = self.get_chunk_meta_header(chunk_id, &headers)?; let meta_bytes = meta.to_json_vec(); @@ -442,7 +290,11 @@ impl ChunkClient { Ok(chunk) } - fn get(&self, path: &str, query: &[(&str, &str)]) -> Result<(HeaderMap, Vec<u8>), ClientError> { + async fn get( + &self, + path: &str, + query: &[(&str, &str)], + ) -> Result<(HeaderMap, Vec<u8>), ClientError> { let url = format!("{}{}", &self.chunks_url(), path); info!("GET {}", url); @@ -458,6 +310,7 @@ impl ChunkClient { let res = self .client .execute(req) + .await .map_err(ClientError::ReqwestError)?; // Did it work? @@ -467,7 +320,7 @@ impl ChunkClient { // Return headers and body. let headers = res.headers().clone(); - let body = res.bytes().map_err(ClientError::ReqwestError)?; + let body = res.bytes().await.map_err(ClientError::ReqwestError)?; let body = body.to_vec(); Ok((headers, body)) } |