diff options
author | Lars Wirzenius <liw@liw.fi> | 2021-07-26 10:09:56 +0000 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2021-07-26 10:09:56 +0000 |
commit | 7f4087758e401791794f0518233ea058355438d3 (patch) | |
tree | 2b314e4572b3c64ff4d182834bf4ca57eb68eb20 | |
parent | be9a3c656938e2aefc21fdb01d1403df05b16893 (diff) | |
parent | 45601862fb983f8fc35ab1c65ad27d0a09285c61 (diff) | |
download | obnam2-7f4087758e401791794f0518233ea058355438d3.tar.gz |
Merge branch 'async-get-chunk' into 'main'
use async for read-only access to chunk server
See merge request obnam/obnam!167
-rw-r--r-- | rustfmt.toml | 1 | ||||
-rw-r--r-- | src/client.rs | 160 | ||||
-rw-r--r-- | src/cmd/get_chunk.rs | 13 | ||||
-rw-r--r-- | src/cmd/list.rs | 12 | ||||
-rw-r--r-- | src/cmd/list_files.rs | 14 | ||||
-rw-r--r-- | src/cmd/restore.rs | 45 | ||||
-rw-r--r-- | src/cmd/show_gen.rs | 15 |
7 files changed, 226 insertions, 34 deletions
diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..32a9786 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +edition = "2018" diff --git a/src/client.rs b/src/client.rs index f90d377..ca0104f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -81,6 +81,166 @@ pub enum ClientError { FileWrite(PathBuf, std::io::Error), } +pub struct AsyncBackupClient { + chunk_client: AsyncChunkClient, +} + +impl AsyncBackupClient { + pub fn new(config: &ClientConfig) -> Result<Self, ClientError> { + info!("creating backup client with config: {:#?}", config); + Ok(Self { + chunk_client: AsyncChunkClient::new(config)?, + }) + } + + pub async fn list_generations(&self) -> Result<GenerationList, ClientError> { + self.chunk_client.list_generations().await + } + + pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { + self.chunk_client.fetch_chunk(chunk_id).await + } + + async fn fetch_generation_chunk(&self, gen_id: &str) -> Result<GenerationChunk, ClientError> { + let chunk_id = ChunkId::recreate(gen_id); + let chunk = self.fetch_chunk(&chunk_id).await?; + let gen = GenerationChunk::from_data_chunk(&chunk)?; + Ok(gen) + } + + pub async fn fetch_generation( + &self, + gen_id: &str, + dbname: &Path, + ) -> Result<LocalGeneration, ClientError> { + let gen = self.fetch_generation_chunk(gen_id).await?; + + // Fetch the SQLite file, storing it in the named file. + let mut dbfile = File::create(&dbname) + .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?; + for id in gen.chunk_ids() { + let chunk = self.fetch_chunk(id).await?; + dbfile + .write_all(chunk.data()) + .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?; + } + info!("downloaded generation to {}", dbname.display()); + + let gen = LocalGeneration::open(dbname)?; + Ok(gen) + } +} + +pub struct AsyncChunkClient { + client: reqwest::Client, + base_url: String, + cipher: CipherEngine, +} + +impl AsyncChunkClient { + pub fn new(config: &ClientConfig) -> Result<Self, ClientError> { + let pass = config.passwords()?; + + let client = reqwest::Client::builder() + .danger_accept_invalid_certs(!config.verify_tls_cert) + .build() + .map_err(ClientError::ReqwestError)?; + Ok(Self { + client, + base_url: config.server_url.to_string(), + cipher: CipherEngine::new(&pass), + }) + } + + fn base_url(&self) -> &str { + &self.base_url + } + + fn chunks_url(&self) -> String { + format!("{}/chunks", self.base_url()) + } + + pub async fn list_generations(&self) -> Result<GenerationList, ClientError> { + let (_, body) = self.get("", &[("generation", "true")]).await?; + + let map: HashMap<String, ChunkMeta> = + serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?; + debug!("list_generations: map={:?}", map); + let finished = map + .iter() + .map(|(id, meta)| FinishedGeneration::new(id, meta.ended().map_or("", |s| s))) + .collect(); + Ok(GenerationList::new(finished)) + } + + pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { + let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?; + let meta = self.get_chunk_meta_header(chunk_id, &headers)?; + + let meta_bytes = meta.to_json_vec(); + let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?; + + Ok(chunk) + } + + async fn get( + &self, + path: &str, + query: &[(&str, &str)], + ) -> Result<(HeaderMap, Vec<u8>), ClientError> { + let url = format!("{}{}", &self.chunks_url(), path); + info!("GET {}", url); + + // Build HTTP request structure. + let req = self + .client + .get(&url) + .query(query) + .build() + .map_err(ClientError::ReqwestError)?; + + // Make HTTP request. + let res = self + .client + .execute(req) + .await + .map_err(ClientError::ReqwestError)?; + + // Did it work? + if res.status() != 200 { + return Err(ClientError::NotFound(path.to_string())); + } + + // Return headers and body. + let headers = res.headers().clone(); + let body = res.bytes().await.map_err(ClientError::ReqwestError)?; + let body = body.to_vec(); + Ok((headers, body)) + } + + fn get_chunk_meta_header( + &self, + chunk_id: &ChunkId, + headers: &HeaderMap, + ) -> Result<ChunkMeta, ClientError> { + let meta = headers.get("chunk-meta"); + + if meta.is_none() { + let err = ClientError::NoChunkMeta(chunk_id.clone()); + error!("fetching chunk {} failed: {}", chunk_id, err); + return Err(err); + } + + let meta = meta + .unwrap() + .to_str() + .map_err(ClientError::MetaHeaderToString)?; + let meta: ChunkMeta = serde_json::from_str(meta).map_err(ClientError::JsonParse)?; + + Ok(meta) + } +} + pub struct BackupClient { chunk_client: ChunkClient, } diff --git a/src/cmd/get_chunk.rs b/src/cmd/get_chunk.rs index 4ee70fe..f574c99 100644 --- a/src/cmd/get_chunk.rs +++ b/src/cmd/get_chunk.rs @@ -1,9 +1,10 @@ use crate::chunkid::ChunkId; -use crate::client::BackupClient; +use crate::client::AsyncBackupClient; use crate::config::ClientConfig; use crate::error::ObnamError; use std::io::{stdout, Write}; use structopt::StructOpt; +use tokio::runtime::Runtime; #[derive(Debug, StructOpt)] pub struct GetChunk { @@ -13,10 +14,14 @@ pub struct GetChunk { impl GetChunk { pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { - let client = BackupClient::new(config)?; - let chunk_id: ChunkId = self.chunk_id.parse().unwrap(); - let chunk = client.fetch_chunk(&chunk_id)?; + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let client = AsyncBackupClient::new(config)?; + let chunk_id: ChunkId = self.chunk_id.parse().unwrap(); + let chunk = client.fetch_chunk(&chunk_id).await?; let stdout = stdout(); let mut handle = stdout.lock(); handle.write_all(chunk.data())?; diff --git a/src/cmd/list.rs b/src/cmd/list.rs index 66036b9..691f2bf 100644 --- a/src/cmd/list.rs +++ b/src/cmd/list.rs @@ -1,16 +1,22 @@ -use crate::client::BackupClient; +use crate::client::AsyncBackupClient; use crate::config::ClientConfig; use crate::error::ObnamError; use structopt::StructOpt; +use tokio::runtime::Runtime; #[derive(Debug, StructOpt)] pub struct List {} impl List { pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { - let client = BackupClient::new(config)?; + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let client = AsyncBackupClient::new(config)?; - let generations = client.list_generations()?; + let generations = client.list_generations().await?; for finished in generations.iter() { println!("{} {}", finished.id(), finished.ended()); } diff --git a/src/cmd/list_files.rs b/src/cmd/list_files.rs index c5191f7..723781b 100644 --- a/src/cmd/list_files.rs +++ b/src/cmd/list_files.rs @@ -1,10 +1,11 @@ use crate::backup_reason::Reason; -use crate::client::BackupClient; +use crate::client::AsyncBackupClient; use crate::config::ClientConfig; use crate::error::ObnamError; use crate::fsentry::{FilesystemEntry, FilesystemKind}; use structopt::StructOpt; use tempfile::NamedTempFile; +use tokio::runtime::Runtime; #[derive(Debug, StructOpt)] pub struct ListFiles { @@ -14,14 +15,19 @@ pub struct ListFiles { impl ListFiles { pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { let temp = NamedTempFile::new()?; - let client = BackupClient::new(config)?; + let client = AsyncBackupClient::new(config)?; - let genlist = client.list_generations()?; + let genlist = client.list_generations().await?; let gen_id: String = genlist.resolve(&self.gen_id)?; - let gen = client.fetch_generation(&gen_id, temp.path())?; + let gen = client.fetch_generation(&gen_id, temp.path()).await?; for file in gen.files()?.iter()? { let file = file?; println!("{}", format_entry(&file.entry(), file.reason())); diff --git a/src/cmd/restore.rs b/src/cmd/restore.rs index d794fe4..458397d 100644 --- a/src/cmd/restore.rs +++ b/src/cmd/restore.rs @@ -1,5 +1,5 @@ use crate::backup_reason::Reason; -use crate::client::{BackupClient, ClientError}; +use crate::client::{AsyncBackupClient, ClientError}; use crate::config::ClientConfig; use crate::error::ObnamError; use crate::fsentry::{FilesystemEntry, FilesystemKind}; @@ -17,6 +17,7 @@ use std::path::StripPrefixError; use std::path::{Path, PathBuf}; use structopt::StructOpt; use tempfile::NamedTempFile; +use tokio::runtime::Runtime; #[derive(Debug, StructOpt)] pub struct Restore { @@ -29,29 +30,37 @@ pub struct Restore { impl Restore { pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { let temp = NamedTempFile::new()?; - let client = BackupClient::new(config)?; + let client = AsyncBackupClient::new(config)?; - let genlist = client.list_generations()?; + let genlist = client.list_generations().await?; let gen_id: String = genlist.resolve(&self.gen_id)?; info!("generation id is {}", gen_id); - let gen = client.fetch_generation(&gen_id, temp.path())?; + let gen = client.fetch_generation(&gen_id, temp.path()).await?; info!("restoring {} files", gen.file_count()?); let progress = create_progress_bar(gen.file_count()?, true); for file in gen.files()?.iter()? { let file = file?; match file.reason() { Reason::FileError => (), - _ => restore_generation( - &client, - &gen, - file.fileno(), - file.entry(), - &self.to, - &progress, - )?, + _ => { + restore_generation( + &client, + &gen, + file.fileno(), + file.entry(), + &self.to, + &progress, + ) + .await? + } } } for file in gen.files()?.iter()? { @@ -118,8 +127,8 @@ pub enum RestoreError { SetTimestamp(PathBuf, std::io::Error), } -fn restore_generation( - client: &BackupClient, +async fn restore_generation( + client: &AsyncBackupClient, gen: &LocalGeneration, fileid: i64, entry: &FilesystemEntry, @@ -132,7 +141,7 @@ fn restore_generation( let to = restored_path(entry, to)?; match entry.kind() { - FilesystemKind::Regular => restore_regular(client, &gen, &to, fileid, &entry)?, + FilesystemKind::Regular => restore_regular(client, &gen, &to, fileid, &entry).await?, FilesystemKind::Directory => restore_directory(&to)?, FilesystemKind::Symlink => restore_symlink(&to, &entry)?, FilesystemKind::Socket => restore_socket(&to, &entry)?, @@ -170,8 +179,8 @@ fn restored_path(entry: &FilesystemEntry, to: &Path) -> Result<PathBuf, RestoreE Ok(to.join(path)) } -fn restore_regular( - client: &BackupClient, +async fn restore_regular( + client: &AsyncBackupClient, gen: &LocalGeneration, path: &Path, fileid: i64, @@ -187,7 +196,7 @@ fn restore_regular( .map_err(|err| RestoreError::CreateFile(path.to_path_buf(), err))?; for chunkid in gen.chunkids(fileid)?.iter()? { let chunkid = chunkid?; - let chunk = client.fetch_chunk(&chunkid)?; + let chunk = client.fetch_chunk(&chunkid).await?; file.write_all(chunk.data()) .map_err(|err| RestoreError::WriteFile(path.to_path_buf(), err))?; } diff --git a/src/cmd/show_gen.rs b/src/cmd/show_gen.rs index df8a030..8df26c2 100644 --- a/src/cmd/show_gen.rs +++ b/src/cmd/show_gen.rs @@ -1,10 +1,11 @@ -use crate::client::BackupClient; +use crate::client::AsyncBackupClient; use crate::config::ClientConfig; use crate::error::ObnamError; use crate::fsentry::FilesystemKind; use indicatif::HumanBytes; use structopt::StructOpt; use tempfile::NamedTempFile; +use tokio::runtime::Runtime; #[derive(Debug, StructOpt)] pub struct ShowGeneration { @@ -14,13 +15,17 @@ pub struct ShowGeneration { impl ShowGeneration { pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { - let temp = NamedTempFile::new()?; + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } - let client = BackupClient::new(config)?; + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let temp = NamedTempFile::new()?; + let client = AsyncBackupClient::new(config)?; - let genlist = client.list_generations()?; + let genlist = client.list_generations().await?; let gen_id: String = genlist.resolve(&self.gen_id)?; - let gen = client.fetch_generation(&gen_id, temp.path())?; + let gen = client.fetch_generation(&gen_id, temp.path()).await?; let mut files = gen.files()?; let mut files = files.iter()?; |