From 9a8f4be06cb476b8149ed4f73d2cb62add14873f Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Thu, 22 Jul 2021 17:34:34 +0300 Subject: refactor: use async for "obnam get-chunk" Sponsored-by: author --- src/client.rs | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/cmd/get_chunk.rs | 13 ++++-- 2 files changed, 123 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/client.rs b/src/client.rs index f90d377..e9ceea3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -81,6 +81,120 @@ pub enum ClientError { FileWrite(PathBuf, std::io::Error), } +pub struct AsyncBackupClient { + chunk_client: AsyncChunkClient, +} + +impl AsyncBackupClient { + pub fn new(config: &ClientConfig) -> Result { + info!("creating backup client with config: {:#?}", config); + Ok(Self { + chunk_client: AsyncChunkClient::new(config)?, + }) + } + + pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { + self.chunk_client.fetch_chunk(chunk_id).await + } +} + +pub struct AsyncChunkClient { + client: reqwest::Client, + base_url: String, + cipher: CipherEngine, +} + +impl AsyncChunkClient { + pub fn new(config: &ClientConfig) -> Result { + let pass = config.passwords()?; + + let client = reqwest::Client::builder() + .danger_accept_invalid_certs(!config.verify_tls_cert) + .build() + .map_err(ClientError::ReqwestError)?; + Ok(Self { + client, + base_url: config.server_url.to_string(), + cipher: CipherEngine::new(&pass), + }) + } + + fn base_url(&self) -> &str { + &self.base_url + } + + fn chunks_url(&self) -> String { + format!("{}/chunks", self.base_url()) + } + + pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { + let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?; + let meta = self.get_chunk_meta_header(chunk_id, &headers)?; + + let meta_bytes = meta.to_json_vec(); + let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?; + + Ok(chunk) + } + + async fn get( + &self, + path: &str, + query: &[(&str, &str)], + ) -> Result<(HeaderMap, Vec), ClientError> { + let url = format!("{}{}", &self.chunks_url(), path); + info!("GET {}", url); + + // Build HTTP request structure. + let req = self + .client + .get(&url) + .query(query) + .build() + .map_err(ClientError::ReqwestError)?; + + // Make HTTP request. + let res = self + .client + .execute(req) + .await + .map_err(ClientError::ReqwestError)?; + + // Did it work? + if res.status() != 200 { + return Err(ClientError::NotFound(path.to_string())); + } + + // Return headers and body. + let headers = res.headers().clone(); + let body = res.bytes().await.map_err(ClientError::ReqwestError)?; + let body = body.to_vec(); + Ok((headers, body)) + } + + fn get_chunk_meta_header( + &self, + chunk_id: &ChunkId, + headers: &HeaderMap, + ) -> Result { + let meta = headers.get("chunk-meta"); + + if meta.is_none() { + let err = ClientError::NoChunkMeta(chunk_id.clone()); + error!("fetching chunk {} failed: {}", chunk_id, err); + return Err(err); + } + + let meta = meta + .unwrap() + .to_str() + .map_err(ClientError::MetaHeaderToString)?; + let meta: ChunkMeta = serde_json::from_str(meta).map_err(ClientError::JsonParse)?; + + Ok(meta) + } +} + pub struct BackupClient { chunk_client: ChunkClient, } diff --git a/src/cmd/get_chunk.rs b/src/cmd/get_chunk.rs index 4ee70fe..f574c99 100644 --- a/src/cmd/get_chunk.rs +++ b/src/cmd/get_chunk.rs @@ -1,9 +1,10 @@ use crate::chunkid::ChunkId; -use crate::client::BackupClient; +use crate::client::AsyncBackupClient; use crate::config::ClientConfig; use crate::error::ObnamError; use std::io::{stdout, Write}; use structopt::StructOpt; +use tokio::runtime::Runtime; #[derive(Debug, StructOpt)] pub struct GetChunk { @@ -13,10 +14,14 @@ pub struct GetChunk { impl GetChunk { pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { - let client = BackupClient::new(config)?; - let chunk_id: ChunkId = self.chunk_id.parse().unwrap(); - let chunk = client.fetch_chunk(&chunk_id)?; + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let client = AsyncBackupClient::new(config)?; + let chunk_id: ChunkId = self.chunk_id.parse().unwrap(); + let chunk = client.fetch_chunk(&chunk_id).await?; let stdout = stdout(); let mut handle = stdout.lock(); handle.write_all(chunk.data())?; -- cgit v1.2.1