summaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2021-08-08 15:03:31 +0300
committerLars Wirzenius <liw@liw.fi>2021-08-09 09:58:38 +0300
commit6e9a1f5f45a5608a05931fd4d579c8ab52316f55 (patch)
treec33ff750c17eb6ca7c7d6771c885da2f31c40cd3 /src/client.rs
parent30c20dcf3d81e9147e2bd4617bd055a5a8a17896 (diff)
downloadobnam2-6e9a1f5f45a5608a05931fd4d579c8ab52316f55.tar.gz
refactor: use async for "obnam backup"
This changes things so that "obnam backup" uses async for everything. The old non-async BackupClient and ChunkClient are dropped. This does NOT move the chunking, and checksum computation, to its own function. This will happen later. Sponsored-by: author
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs247
1 files changed, 50 insertions, 197 deletions
diff --git a/src/client.rs b/src/client.rs
index c655bb2..9fddc18 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -11,7 +11,6 @@ use crate::genlist::GenerationList;
use chrono::{DateTime, Local};
use log::{debug, error, info};
-use reqwest::blocking::Client;
use reqwest::header::HeaderMap;
use std::collections::HashMap;
use std::fs::File;
@@ -93,166 +92,7 @@ impl AsyncBackupClient {
})
}
- pub async fn list_generations(&self) -> Result<GenerationList, ClientError> {
- self.chunk_client.list_generations().await
- }
-
- pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
- self.chunk_client.fetch_chunk(chunk_id).await
- }
-
- async fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result<GenerationChunk, ClientError> {
- let chunk = self.fetch_chunk(gen_id.as_chunk_id()).await?;
- let gen = GenerationChunk::from_data_chunk(&chunk)?;
- Ok(gen)
- }
-
- pub async fn fetch_generation(
- &self,
- gen_id: &GenId,
- dbname: &Path,
- ) -> Result<LocalGeneration, ClientError> {
- let gen = self.fetch_generation_chunk(gen_id).await?;
-
- // Fetch the SQLite file, storing it in the named file.
- let mut dbfile = File::create(&dbname)
- .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?;
- for id in gen.chunk_ids() {
- let chunk = self.fetch_chunk(id).await?;
- dbfile
- .write_all(chunk.data())
- .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?;
- }
- info!("downloaded generation to {}", dbname.display());
-
- let gen = LocalGeneration::open(dbname)?;
- Ok(gen)
- }
-}
-
-pub struct AsyncChunkClient {
- client: reqwest::Client,
- base_url: String,
- cipher: CipherEngine,
-}
-
-impl AsyncChunkClient {
- pub fn new(config: &ClientConfig) -> Result<Self, ClientError> {
- let pass = config.passwords()?;
-
- let client = reqwest::Client::builder()
- .danger_accept_invalid_certs(!config.verify_tls_cert)
- .build()
- .map_err(ClientError::ReqwestError)?;
- Ok(Self {
- client,
- base_url: config.server_url.to_string(),
- cipher: CipherEngine::new(&pass),
- })
- }
-
- fn base_url(&self) -> &str {
- &self.base_url
- }
-
- fn chunks_url(&self) -> String {
- format!("{}/chunks", self.base_url())
- }
-
- pub async fn list_generations(&self) -> Result<GenerationList, ClientError> {
- let (_, body) = self.get("", &[("generation", "true")]).await?;
-
- let map: HashMap<String, ChunkMeta> =
- serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?;
- debug!("list_generations: map={:?}", map);
- let finished = map
- .iter()
- .map(|(id, meta)| FinishedGeneration::new(id, meta.ended().map_or("", |s| s)))
- .collect();
- Ok(GenerationList::new(finished))
- }
-
- pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
- let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?;
- let meta = self.get_chunk_meta_header(chunk_id, &headers)?;
-
- let meta_bytes = meta.to_json_vec();
- let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?;
-
- Ok(chunk)
- }
-
- async fn get(
- &self,
- path: &str,
- query: &[(&str, &str)],
- ) -> Result<(HeaderMap, Vec<u8>), ClientError> {
- let url = format!("{}{}", &self.chunks_url(), path);
- info!("GET {}", url);
-
- // Build HTTP request structure.
- let req = self
- .client
- .get(&url)
- .query(query)
- .build()
- .map_err(ClientError::ReqwestError)?;
-
- // Make HTTP request.
- let res = self
- .client
- .execute(req)
- .await
- .map_err(ClientError::ReqwestError)?;
-
- // Did it work?
- if res.status() != 200 {
- return Err(ClientError::NotFound(path.to_string()));
- }
-
- // Return headers and body.
- let headers = res.headers().clone();
- let body = res.bytes().await.map_err(ClientError::ReqwestError)?;
- let body = body.to_vec();
- Ok((headers, body))
- }
-
- fn get_chunk_meta_header(
- &self,
- chunk_id: &ChunkId,
- headers: &HeaderMap,
- ) -> Result<ChunkMeta, ClientError> {
- let meta = headers.get("chunk-meta");
-
- if meta.is_none() {
- let err = ClientError::NoChunkMeta(chunk_id.clone());
- error!("fetching chunk {} failed: {}", chunk_id, err);
- return Err(err);
- }
-
- let meta = meta
- .unwrap()
- .to_str()
- .map_err(ClientError::MetaHeaderToString)?;
- let meta: ChunkMeta = serde_json::from_str(meta).map_err(ClientError::JsonParse)?;
-
- Ok(meta)
- }
-}
-
-pub struct BackupClient {
- chunk_client: ChunkClient,
-}
-
-impl BackupClient {
- pub fn new(config: &ClientConfig) -> Result<Self, ClientError> {
- info!("creating backup client with config: {:#?}", config);
- Ok(Self {
- chunk_client: ChunkClient::new(config)?,
- })
- }
-
- pub fn upload_filesystem_entry(
+ pub async fn upload_filesystem_entry(
&self,
e: &FilesystemEntry,
size: usize,
@@ -260,7 +100,7 @@ impl BackupClient {
let path = e.pathbuf();
info!("uploading {:?}", path);
let ids = match e.kind() {
- FilesystemKind::Regular => self.read_file(&path, size)?,
+ FilesystemKind::Regular => self.read_file(&path, size).await?,
FilesystemKind::Directory => vec![],
FilesystemKind::Symlink => vec![],
FilesystemKind::Socket => vec![],
@@ -270,42 +110,49 @@ impl BackupClient {
Ok(ids)
}
- pub fn upload_generation(&self, filename: &Path, size: usize) -> Result<ChunkId, ClientError> {
+ pub async fn upload_generation(
+ &self,
+ filename: &Path,
+ size: usize,
+ ) -> Result<ChunkId, ClientError> {
info!("upload SQLite {}", filename.display());
- let ids = self.read_file(filename, size)?;
+ let ids = self.read_file(filename, size).await?;
let gen = GenerationChunk::new(ids);
let data = gen.to_data_chunk(&current_timestamp())?;
- let gen_id = self.upload_chunk(data)?;
+ let gen_id = self.upload_chunk(data).await?;
info!("uploaded generation {}", gen_id);
Ok(gen_id)
}
- fn read_file(&self, filename: &Path, size: usize) -> Result<Vec<ChunkId>, ClientError> {
+ async fn read_file(&self, filename: &Path, size: usize) -> Result<Vec<ChunkId>, ClientError> {
info!("upload file {}", filename.display());
let file = std::fs::File::open(filename)
.map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?;
let chunker = Chunker::new(size, file, filename);
- let chunk_ids = self.upload_new_file_chunks(chunker)?;
+ let chunk_ids = self.upload_new_file_chunks(chunker).await?;
Ok(chunk_ids)
}
- pub fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> {
- self.chunk_client.has_chunk(meta)
+ pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> {
+ self.chunk_client.has_chunk(meta).await
}
- pub fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> {
- self.chunk_client.upload_chunk(chunk)
+ pub async fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> {
+ self.chunk_client.upload_chunk(chunk).await
}
- pub fn upload_new_file_chunks(&self, chunker: Chunker) -> Result<Vec<ChunkId>, ClientError> {
+ pub async fn upload_new_file_chunks(
+ &self,
+ chunker: Chunker,
+ ) -> Result<Vec<ChunkId>, ClientError> {
let mut chunk_ids = vec![];
for item in chunker {
let chunk = item?;
- if let Some(chunk_id) = self.has_chunk(chunk.meta())? {
+ if let Some(chunk_id) = self.has_chunk(chunk.meta()).await? {
chunk_ids.push(chunk_id.clone());
info!("reusing existing chunk {}", chunk_id);
} else {
- let chunk_id = self.upload_chunk(chunk)?;
+ let chunk_id = self.upload_chunk(chunk).await?;
chunk_ids.push(chunk_id.clone());
info!("created new chunk {}", chunk_id);
}
@@ -314,32 +161,32 @@ impl BackupClient {
Ok(chunk_ids)
}
- pub fn list_generations(&self) -> Result<GenerationList, ClientError> {
- self.chunk_client.list_generations()
+ pub async fn list_generations(&self) -> Result<GenerationList, ClientError> {
+ self.chunk_client.list_generations().await
}
- pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
- self.chunk_client.fetch_chunk(chunk_id)
+ pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
+ self.chunk_client.fetch_chunk(chunk_id).await
}
- fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result<GenerationChunk, ClientError> {
- let chunk = self.fetch_chunk(gen_id.as_chunk_id())?;
+ async fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result<GenerationChunk, ClientError> {
+ let chunk = self.fetch_chunk(gen_id.as_chunk_id()).await?;
let gen = GenerationChunk::from_data_chunk(&chunk)?;
Ok(gen)
}
- pub fn fetch_generation(
+ pub async fn fetch_generation(
&self,
gen_id: &GenId,
dbname: &Path,
) -> Result<LocalGeneration, ClientError> {
- let gen = self.fetch_generation_chunk(gen_id)?;
+ let gen = self.fetch_generation_chunk(gen_id).await?;
// Fetch the SQLite file, storing it in the named file.
let mut dbfile = File::create(&dbname)
.map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?;
for id in gen.chunk_ids() {
- let chunk = self.fetch_chunk(id)?;
+ let chunk = self.fetch_chunk(id).await?;
dbfile
.write_all(chunk.data())
.map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?;
@@ -351,17 +198,17 @@ impl BackupClient {
}
}
-pub struct ChunkClient {
- client: Client,
+pub struct AsyncChunkClient {
+ client: reqwest::Client,
base_url: String,
cipher: CipherEngine,
}
-impl ChunkClient {
+impl AsyncChunkClient {
pub fn new(config: &ClientConfig) -> Result<Self, ClientError> {
let pass = config.passwords()?;
- let client = Client::builder()
+ let client = reqwest::Client::builder()
.danger_accept_invalid_certs(!config.verify_tls_cert)
.build()
.map_err(ClientError::ReqwestError)?;
@@ -380,8 +227,8 @@ impl ChunkClient {
format!("{}/chunks", self.base_url())
}
- pub fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> {
- let body = match self.get("", &[("sha256", meta.sha256())]) {
+ pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> {
+ let body = match self.get("", &[("sha256", meta.sha256())]).await {
Ok((_, body)) => body,
Err(err) => return Err(err),
};
@@ -398,7 +245,7 @@ impl ChunkClient {
Ok(has)
}
- pub fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> {
+ pub async fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> {
let enc = self.cipher.encrypt_chunk(&chunk)?;
let res = self
.client
@@ -406,9 +253,10 @@ impl ChunkClient {
.header("chunk-meta", chunk.meta().to_json())
.body(enc.ciphertext().to_vec())
.send()
+ .await
.map_err(ClientError::ReqwestError)?;
debug!("upload_chunk: res={:?}", res);
- let res: HashMap<String, String> = res.json().map_err(ClientError::ReqwestError)?;
+ let res: HashMap<String, String> = res.json().await.map_err(ClientError::ReqwestError)?;
let chunk_id = if let Some(chunk_id) = res.get("chunk_id") {
debug!("upload_chunk: id={}", chunk_id);
chunk_id.parse().unwrap()
@@ -419,8 +267,8 @@ impl ChunkClient {
Ok(chunk_id)
}
- pub fn list_generations(&self) -> Result<GenerationList, ClientError> {
- let (_, body) = self.get("", &[("generation", "true")])?;
+ pub async fn list_generations(&self) -> Result<GenerationList, ClientError> {
+ let (_, body) = self.get("", &[("generation", "true")]).await?;
let map: HashMap<String, ChunkMeta> =
serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?;
@@ -432,8 +280,8 @@ impl ChunkClient {
Ok(GenerationList::new(finished))
}
- pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
- let (headers, body) = self.get(&format!("/{}", chunk_id), &[])?;
+ pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
+ let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?;
let meta = self.get_chunk_meta_header(chunk_id, &headers)?;
let meta_bytes = meta.to_json_vec();
@@ -442,7 +290,11 @@ impl ChunkClient {
Ok(chunk)
}
- fn get(&self, path: &str, query: &[(&str, &str)]) -> Result<(HeaderMap, Vec<u8>), ClientError> {
+ async fn get(
+ &self,
+ path: &str,
+ query: &[(&str, &str)],
+ ) -> Result<(HeaderMap, Vec<u8>), ClientError> {
let url = format!("{}{}", &self.chunks_url(), path);
info!("GET {}", url);
@@ -458,6 +310,7 @@ impl ChunkClient {
let res = self
.client
.execute(req)
+ .await
.map_err(ClientError::ReqwestError)?;
// Did it work?
@@ -467,7 +320,7 @@ impl ChunkClient {
// Return headers and body.
let headers = res.headers().clone();
- let body = res.bytes().map_err(ClientError::ReqwestError)?;
+ let body = res.bytes().await.map_err(ClientError::ReqwestError)?;
let body = body.to_vec();
Ok((headers, body))
}