summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2021-07-26 10:09:56 +0000
committerLars Wirzenius <liw@liw.fi>2021-07-26 10:09:56 +0000
commit7f4087758e401791794f0518233ea058355438d3 (patch)
tree2b314e4572b3c64ff4d182834bf4ca57eb68eb20
parentbe9a3c656938e2aefc21fdb01d1403df05b16893 (diff)
parent45601862fb983f8fc35ab1c65ad27d0a09285c61 (diff)
downloadobnam2-7f4087758e401791794f0518233ea058355438d3.tar.gz
Merge branch 'async-get-chunk' into 'main'
use async for read-only access to chunk server See merge request obnam/obnam!167
-rw-r--r--rustfmt.toml1
-rw-r--r--src/client.rs160
-rw-r--r--src/cmd/get_chunk.rs13
-rw-r--r--src/cmd/list.rs12
-rw-r--r--src/cmd/list_files.rs14
-rw-r--r--src/cmd/restore.rs45
-rw-r--r--src/cmd/show_gen.rs15
7 files changed, 226 insertions, 34 deletions
diff --git a/rustfmt.toml b/rustfmt.toml
new file mode 100644
index 0000000..32a9786
--- /dev/null
+++ b/rustfmt.toml
@@ -0,0 +1 @@
+edition = "2018"
diff --git a/src/client.rs b/src/client.rs
index f90d377..ca0104f 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -81,6 +81,166 @@ pub enum ClientError {
FileWrite(PathBuf, std::io::Error),
}
+pub struct AsyncBackupClient {
+ chunk_client: AsyncChunkClient,
+}
+
+impl AsyncBackupClient {
+ pub fn new(config: &ClientConfig) -> Result<Self, ClientError> {
+ info!("creating backup client with config: {:#?}", config);
+ Ok(Self {
+ chunk_client: AsyncChunkClient::new(config)?,
+ })
+ }
+
+ pub async fn list_generations(&self) -> Result<GenerationList, ClientError> {
+ self.chunk_client.list_generations().await
+ }
+
+ pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
+ self.chunk_client.fetch_chunk(chunk_id).await
+ }
+
+ async fn fetch_generation_chunk(&self, gen_id: &str) -> Result<GenerationChunk, ClientError> {
+ let chunk_id = ChunkId::recreate(gen_id);
+ let chunk = self.fetch_chunk(&chunk_id).await?;
+ let gen = GenerationChunk::from_data_chunk(&chunk)?;
+ Ok(gen)
+ }
+
+ pub async fn fetch_generation(
+ &self,
+ gen_id: &str,
+ dbname: &Path,
+ ) -> Result<LocalGeneration, ClientError> {
+ let gen = self.fetch_generation_chunk(gen_id).await?;
+
+ // Fetch the SQLite file, storing it in the named file.
+ let mut dbfile = File::create(&dbname)
+ .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?;
+ for id in gen.chunk_ids() {
+ let chunk = self.fetch_chunk(id).await?;
+ dbfile
+ .write_all(chunk.data())
+ .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?;
+ }
+ info!("downloaded generation to {}", dbname.display());
+
+ let gen = LocalGeneration::open(dbname)?;
+ Ok(gen)
+ }
+}
+
+pub struct AsyncChunkClient {
+ client: reqwest::Client,
+ base_url: String,
+ cipher: CipherEngine,
+}
+
+impl AsyncChunkClient {
+ pub fn new(config: &ClientConfig) -> Result<Self, ClientError> {
+ let pass = config.passwords()?;
+
+ let client = reqwest::Client::builder()
+ .danger_accept_invalid_certs(!config.verify_tls_cert)
+ .build()
+ .map_err(ClientError::ReqwestError)?;
+ Ok(Self {
+ client,
+ base_url: config.server_url.to_string(),
+ cipher: CipherEngine::new(&pass),
+ })
+ }
+
+ fn base_url(&self) -> &str {
+ &self.base_url
+ }
+
+ fn chunks_url(&self) -> String {
+ format!("{}/chunks", self.base_url())
+ }
+
+ pub async fn list_generations(&self) -> Result<GenerationList, ClientError> {
+ let (_, body) = self.get("", &[("generation", "true")]).await?;
+
+ let map: HashMap<String, ChunkMeta> =
+ serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?;
+ debug!("list_generations: map={:?}", map);
+ let finished = map
+ .iter()
+ .map(|(id, meta)| FinishedGeneration::new(id, meta.ended().map_or("", |s| s)))
+ .collect();
+ Ok(GenerationList::new(finished))
+ }
+
+ pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
+ let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?;
+ let meta = self.get_chunk_meta_header(chunk_id, &headers)?;
+
+ let meta_bytes = meta.to_json_vec();
+ let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?;
+
+ Ok(chunk)
+ }
+
+ async fn get(
+ &self,
+ path: &str,
+ query: &[(&str, &str)],
+ ) -> Result<(HeaderMap, Vec<u8>), ClientError> {
+ let url = format!("{}{}", &self.chunks_url(), path);
+ info!("GET {}", url);
+
+ // Build HTTP request structure.
+ let req = self
+ .client
+ .get(&url)
+ .query(query)
+ .build()
+ .map_err(ClientError::ReqwestError)?;
+
+ // Make HTTP request.
+ let res = self
+ .client
+ .execute(req)
+ .await
+ .map_err(ClientError::ReqwestError)?;
+
+ // Did it work?
+ if res.status() != 200 {
+ return Err(ClientError::NotFound(path.to_string()));
+ }
+
+ // Return headers and body.
+ let headers = res.headers().clone();
+ let body = res.bytes().await.map_err(ClientError::ReqwestError)?;
+ let body = body.to_vec();
+ Ok((headers, body))
+ }
+
+ fn get_chunk_meta_header(
+ &self,
+ chunk_id: &ChunkId,
+ headers: &HeaderMap,
+ ) -> Result<ChunkMeta, ClientError> {
+ let meta = headers.get("chunk-meta");
+
+ if meta.is_none() {
+ let err = ClientError::NoChunkMeta(chunk_id.clone());
+ error!("fetching chunk {} failed: {}", chunk_id, err);
+ return Err(err);
+ }
+
+ let meta = meta
+ .unwrap()
+ .to_str()
+ .map_err(ClientError::MetaHeaderToString)?;
+ let meta: ChunkMeta = serde_json::from_str(meta).map_err(ClientError::JsonParse)?;
+
+ Ok(meta)
+ }
+}
+
pub struct BackupClient {
chunk_client: ChunkClient,
}
diff --git a/src/cmd/get_chunk.rs b/src/cmd/get_chunk.rs
index 4ee70fe..f574c99 100644
--- a/src/cmd/get_chunk.rs
+++ b/src/cmd/get_chunk.rs
@@ -1,9 +1,10 @@
use crate::chunkid::ChunkId;
-use crate::client::BackupClient;
+use crate::client::AsyncBackupClient;
use crate::config::ClientConfig;
use crate::error::ObnamError;
use std::io::{stdout, Write};
use structopt::StructOpt;
+use tokio::runtime::Runtime;
#[derive(Debug, StructOpt)]
pub struct GetChunk {
@@ -13,10 +14,14 @@ pub struct GetChunk {
impl GetChunk {
pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
- let client = BackupClient::new(config)?;
- let chunk_id: ChunkId = self.chunk_id.parse().unwrap();
- let chunk = client.fetch_chunk(&chunk_id)?;
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let client = AsyncBackupClient::new(config)?;
+ let chunk_id: ChunkId = self.chunk_id.parse().unwrap();
+ let chunk = client.fetch_chunk(&chunk_id).await?;
let stdout = stdout();
let mut handle = stdout.lock();
handle.write_all(chunk.data())?;
diff --git a/src/cmd/list.rs b/src/cmd/list.rs
index 66036b9..691f2bf 100644
--- a/src/cmd/list.rs
+++ b/src/cmd/list.rs
@@ -1,16 +1,22 @@
-use crate::client::BackupClient;
+use crate::client::AsyncBackupClient;
use crate::config::ClientConfig;
use crate::error::ObnamError;
use structopt::StructOpt;
+use tokio::runtime::Runtime;
#[derive(Debug, StructOpt)]
pub struct List {}
impl List {
pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
- let client = BackupClient::new(config)?;
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
+
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let client = AsyncBackupClient::new(config)?;
- let generations = client.list_generations()?;
+ let generations = client.list_generations().await?;
for finished in generations.iter() {
println!("{} {}", finished.id(), finished.ended());
}
diff --git a/src/cmd/list_files.rs b/src/cmd/list_files.rs
index c5191f7..723781b 100644
--- a/src/cmd/list_files.rs
+++ b/src/cmd/list_files.rs
@@ -1,10 +1,11 @@
use crate::backup_reason::Reason;
-use crate::client::BackupClient;
+use crate::client::AsyncBackupClient;
use crate::config::ClientConfig;
use crate::error::ObnamError;
use crate::fsentry::{FilesystemEntry, FilesystemKind};
use structopt::StructOpt;
use tempfile::NamedTempFile;
+use tokio::runtime::Runtime;
#[derive(Debug, StructOpt)]
pub struct ListFiles {
@@ -14,14 +15,19 @@ pub struct ListFiles {
impl ListFiles {
pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
+
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
let temp = NamedTempFile::new()?;
- let client = BackupClient::new(config)?;
+ let client = AsyncBackupClient::new(config)?;
- let genlist = client.list_generations()?;
+ let genlist = client.list_generations().await?;
let gen_id: String = genlist.resolve(&self.gen_id)?;
- let gen = client.fetch_generation(&gen_id, temp.path())?;
+ let gen = client.fetch_generation(&gen_id, temp.path()).await?;
for file in gen.files()?.iter()? {
let file = file?;
println!("{}", format_entry(&file.entry(), file.reason()));
diff --git a/src/cmd/restore.rs b/src/cmd/restore.rs
index d794fe4..458397d 100644
--- a/src/cmd/restore.rs
+++ b/src/cmd/restore.rs
@@ -1,5 +1,5 @@
use crate::backup_reason::Reason;
-use crate::client::{BackupClient, ClientError};
+use crate::client::{AsyncBackupClient, ClientError};
use crate::config::ClientConfig;
use crate::error::ObnamError;
use crate::fsentry::{FilesystemEntry, FilesystemKind};
@@ -17,6 +17,7 @@ use std::path::StripPrefixError;
use std::path::{Path, PathBuf};
use structopt::StructOpt;
use tempfile::NamedTempFile;
+use tokio::runtime::Runtime;
#[derive(Debug, StructOpt)]
pub struct Restore {
@@ -29,29 +30,37 @@ pub struct Restore {
impl Restore {
pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
+
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
let temp = NamedTempFile::new()?;
- let client = BackupClient::new(config)?;
+ let client = AsyncBackupClient::new(config)?;
- let genlist = client.list_generations()?;
+ let genlist = client.list_generations().await?;
let gen_id: String = genlist.resolve(&self.gen_id)?;
info!("generation id is {}", gen_id);
- let gen = client.fetch_generation(&gen_id, temp.path())?;
+ let gen = client.fetch_generation(&gen_id, temp.path()).await?;
info!("restoring {} files", gen.file_count()?);
let progress = create_progress_bar(gen.file_count()?, true);
for file in gen.files()?.iter()? {
let file = file?;
match file.reason() {
Reason::FileError => (),
- _ => restore_generation(
- &client,
- &gen,
- file.fileno(),
- file.entry(),
- &self.to,
- &progress,
- )?,
+ _ => {
+ restore_generation(
+ &client,
+ &gen,
+ file.fileno(),
+ file.entry(),
+ &self.to,
+ &progress,
+ )
+ .await?
+ }
}
}
for file in gen.files()?.iter()? {
@@ -118,8 +127,8 @@ pub enum RestoreError {
SetTimestamp(PathBuf, std::io::Error),
}
-fn restore_generation(
- client: &BackupClient,
+async fn restore_generation(
+ client: &AsyncBackupClient,
gen: &LocalGeneration,
fileid: i64,
entry: &FilesystemEntry,
@@ -132,7 +141,7 @@ fn restore_generation(
let to = restored_path(entry, to)?;
match entry.kind() {
- FilesystemKind::Regular => restore_regular(client, &gen, &to, fileid, &entry)?,
+ FilesystemKind::Regular => restore_regular(client, &gen, &to, fileid, &entry).await?,
FilesystemKind::Directory => restore_directory(&to)?,
FilesystemKind::Symlink => restore_symlink(&to, &entry)?,
FilesystemKind::Socket => restore_socket(&to, &entry)?,
@@ -170,8 +179,8 @@ fn restored_path(entry: &FilesystemEntry, to: &Path) -> Result<PathBuf, RestoreE
Ok(to.join(path))
}
-fn restore_regular(
- client: &BackupClient,
+async fn restore_regular(
+ client: &AsyncBackupClient,
gen: &LocalGeneration,
path: &Path,
fileid: i64,
@@ -187,7 +196,7 @@ fn restore_regular(
.map_err(|err| RestoreError::CreateFile(path.to_path_buf(), err))?;
for chunkid in gen.chunkids(fileid)?.iter()? {
let chunkid = chunkid?;
- let chunk = client.fetch_chunk(&chunkid)?;
+ let chunk = client.fetch_chunk(&chunkid).await?;
file.write_all(chunk.data())
.map_err(|err| RestoreError::WriteFile(path.to_path_buf(), err))?;
}
diff --git a/src/cmd/show_gen.rs b/src/cmd/show_gen.rs
index df8a030..8df26c2 100644
--- a/src/cmd/show_gen.rs
+++ b/src/cmd/show_gen.rs
@@ -1,10 +1,11 @@
-use crate::client::BackupClient;
+use crate::client::AsyncBackupClient;
use crate::config::ClientConfig;
use crate::error::ObnamError;
use crate::fsentry::FilesystemKind;
use indicatif::HumanBytes;
use structopt::StructOpt;
use tempfile::NamedTempFile;
+use tokio::runtime::Runtime;
#[derive(Debug, StructOpt)]
pub struct ShowGeneration {
@@ -14,13 +15,17 @@ pub struct ShowGeneration {
impl ShowGeneration {
pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
- let temp = NamedTempFile::new()?;
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
- let client = BackupClient::new(config)?;
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let temp = NamedTempFile::new()?;
+ let client = AsyncBackupClient::new(config)?;
- let genlist = client.list_generations()?;
+ let genlist = client.list_generations().await?;
let gen_id: String = genlist.resolve(&self.gen_id)?;
- let gen = client.fetch_generation(&gen_id, temp.path())?;
+ let gen = client.fetch_generation(&gen_id, temp.path()).await?;
let mut files = gen.files()?;
let mut files = files.iter()?;