summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2021-07-22 17:34:34 +0300
committerLars Wirzenius <liw@liw.fi>2021-07-23 19:22:58 +0300
commit9a8f4be06cb476b8149ed4f73d2cb62add14873f (patch)
tree3f6f3c21cfa0bcbdd153dd2c4a80b0c083709600 /src
parent34b3e7fe4c1e78ce7c04ba0942c3f5a60db27785 (diff)
downloadobnam2-9a8f4be06cb476b8149ed4f73d2cb62add14873f.tar.gz
refactor: use async for "obnam get-chunk"
Sponsored-by: author
Diffstat (limited to 'src')
-rw-r--r--src/client.rs114
-rw-r--r--src/cmd/get_chunk.rs13
2 files changed, 123 insertions, 4 deletions
diff --git a/src/client.rs b/src/client.rs
index f90d377..e9ceea3 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -81,6 +81,120 @@ pub enum ClientError {
FileWrite(PathBuf, std::io::Error),
}
+pub struct AsyncBackupClient {
+ chunk_client: AsyncChunkClient,
+}
+
+impl AsyncBackupClient {
+ pub fn new(config: &ClientConfig) -> Result<Self, ClientError> {
+ info!("creating backup client with config: {:#?}", config);
+ Ok(Self {
+ chunk_client: AsyncChunkClient::new(config)?,
+ })
+ }
+
+ pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
+ self.chunk_client.fetch_chunk(chunk_id).await
+ }
+}
+
+pub struct AsyncChunkClient {
+ client: reqwest::Client,
+ base_url: String,
+ cipher: CipherEngine,
+}
+
+impl AsyncChunkClient {
+ pub fn new(config: &ClientConfig) -> Result<Self, ClientError> {
+ let pass = config.passwords()?;
+
+ let client = reqwest::Client::builder()
+ .danger_accept_invalid_certs(!config.verify_tls_cert)
+ .build()
+ .map_err(ClientError::ReqwestError)?;
+ Ok(Self {
+ client,
+ base_url: config.server_url.to_string(),
+ cipher: CipherEngine::new(&pass),
+ })
+ }
+
+ fn base_url(&self) -> &str {
+ &self.base_url
+ }
+
+ fn chunks_url(&self) -> String {
+ format!("{}/chunks", self.base_url())
+ }
+
+ pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
+ let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?;
+ let meta = self.get_chunk_meta_header(chunk_id, &headers)?;
+
+ let meta_bytes = meta.to_json_vec();
+ let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?;
+
+ Ok(chunk)
+ }
+
+ async fn get(
+ &self,
+ path: &str,
+ query: &[(&str, &str)],
+ ) -> Result<(HeaderMap, Vec<u8>), ClientError> {
+ let url = format!("{}{}", &self.chunks_url(), path);
+ info!("GET {}", url);
+
+ // Build HTTP request structure.
+ let req = self
+ .client
+ .get(&url)
+ .query(query)
+ .build()
+ .map_err(ClientError::ReqwestError)?;
+
+ // Make HTTP request.
+ let res = self
+ .client
+ .execute(req)
+ .await
+ .map_err(ClientError::ReqwestError)?;
+
+ // Did it work?
+ if res.status() != 200 {
+ return Err(ClientError::NotFound(path.to_string()));
+ }
+
+ // Return headers and body.
+ let headers = res.headers().clone();
+ let body = res.bytes().await.map_err(ClientError::ReqwestError)?;
+ let body = body.to_vec();
+ Ok((headers, body))
+ }
+
+ fn get_chunk_meta_header(
+ &self,
+ chunk_id: &ChunkId,
+ headers: &HeaderMap,
+ ) -> Result<ChunkMeta, ClientError> {
+ let meta = headers.get("chunk-meta");
+
+ if meta.is_none() {
+ let err = ClientError::NoChunkMeta(chunk_id.clone());
+ error!("fetching chunk {} failed: {}", chunk_id, err);
+ return Err(err);
+ }
+
+ let meta = meta
+ .unwrap()
+ .to_str()
+ .map_err(ClientError::MetaHeaderToString)?;
+ let meta: ChunkMeta = serde_json::from_str(meta).map_err(ClientError::JsonParse)?;
+
+ Ok(meta)
+ }
+}
+
pub struct BackupClient {
chunk_client: ChunkClient,
}
diff --git a/src/cmd/get_chunk.rs b/src/cmd/get_chunk.rs
index 4ee70fe..f574c99 100644
--- a/src/cmd/get_chunk.rs
+++ b/src/cmd/get_chunk.rs
@@ -1,9 +1,10 @@
use crate::chunkid::ChunkId;
-use crate::client::BackupClient;
+use crate::client::AsyncBackupClient;
use crate::config::ClientConfig;
use crate::error::ObnamError;
use std::io::{stdout, Write};
use structopt::StructOpt;
+use tokio::runtime::Runtime;
#[derive(Debug, StructOpt)]
pub struct GetChunk {
@@ -13,10 +14,14 @@ pub struct GetChunk {
impl GetChunk {
pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
- let client = BackupClient::new(config)?;
- let chunk_id: ChunkId = self.chunk_id.parse().unwrap();
- let chunk = client.fetch_chunk(&chunk_id)?;
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let client = AsyncBackupClient::new(config)?;
+ let chunk_id: ChunkId = self.chunk_id.parse().unwrap();
+ let chunk = client.fetch_chunk(&chunk_id).await?;
let stdout = stdout();
let mut handle = stdout.lock();
handle.write_all(chunk.data())?;