summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2021-08-18 16:27:43 +0000
committerLars Wirzenius <liw@liw.fi>2021-08-18 16:27:43 +0000
commit78d79ca1ca0e4f3b9e9f95f0d9e2cd9891ef95f3 (patch)
treee71c58b56b3e9a04029d95c3ef5fc398dc6f464e
parent22a4e91448b40477a8247adc7834fa62f02725c6 (diff)
parent6bc136082f5fc02f4f2ebdcc0facbf666ee7b01c (diff)
downloadobnam2-78d79ca1ca0e4f3b9e9f95f0d9e2cd9891ef95f3.tar.gz
Merge branch 'refactor_upload_chunkify' into 'main'
change "obnam backup" to be all async Closes #113 See merge request obnam/obnam!174
-rw-r--r--src/backup_run.rs310
-rw-r--r--src/client.rs269
-rw-r--r--src/cmd/backup.rs44
-rw-r--r--src/generation.rs49
4 files changed, 274 insertions, 398 deletions
diff --git a/src/backup_run.rs b/src/backup_run.rs
index 7b24e14..b536f6e 100644
--- a/src/backup_run.rs
+++ b/src/backup_run.rs
@@ -1,20 +1,27 @@
use crate::backup_progress::BackupProgress;
use crate::backup_reason::Reason;
+use crate::chunk::{GenerationChunk, GenerationChunkError};
+use crate::chunker::{Chunker, ChunkerError};
use crate::chunkid::ChunkId;
-use crate::client::{BackupClient, ClientError};
+use crate::client::{AsyncBackupClient, ClientError};
use crate::config::ClientConfig;
use crate::error::ObnamError;
-use crate::fsentry::FilesystemEntry;
+use crate::fsentry::{FilesystemEntry, FilesystemKind};
use crate::fsiter::{AnnotatedFsEntry, FsIterError, FsIterator};
use crate::generation::{
GenId, LocalGeneration, LocalGenerationError, NascentError, NascentGeneration,
};
use crate::policy::BackupPolicy;
-use log::{info, warn};
+
+use bytesize::MIB;
+use chrono::{DateTime, Local};
+use log::{debug, error, info, warn};
use std::path::{Path, PathBuf};
+const SQLITE_CHUNK_SIZE: usize = MIB as usize;
+
pub struct BackupRun<'a> {
- client: &'a BackupClient,
+ client: &'a AsyncBackupClient,
policy: BackupPolicy,
buffer_size: usize,
progress: Option<BackupProgress>,
@@ -29,7 +36,16 @@ pub enum BackupError {
FsIterError(#[from] FsIterError),
#[error(transparent)]
+ NascentError(#[from] NascentError),
+
+ #[error(transparent)]
LocalGenerationError(#[from] LocalGenerationError),
+
+ #[error(transparent)]
+ ChunkerError(#[from] ChunkerError),
+
+ #[error(transparent)]
+ GenerationChunkError(#[from] GenerationChunkError),
}
#[derive(Debug)]
@@ -41,6 +57,13 @@ pub struct FsEntryBackupOutcome {
}
#[derive(Debug)]
+struct OneRootBackupOutcome {
+ pub files_count: i64,
+ pub warnings: Vec<BackupError>,
+ pub new_cachedir_tags: Vec<PathBuf>,
+}
+
+#[derive(Debug)]
pub struct RootsBackupOutcome {
/// The number of backed up files.
pub files_count: i64,
@@ -48,10 +71,15 @@ pub struct RootsBackupOutcome {
pub warnings: Vec<BackupError>,
/// CACHEDIR.TAG files that aren't present in in a previous generation.
pub new_cachedir_tags: Vec<PathBuf>,
+ /// Id of new generation.
+ pub gen_id: GenId,
}
impl<'a> BackupRun<'a> {
- pub fn initial(config: &ClientConfig, client: &'a BackupClient) -> Result<Self, BackupError> {
+ pub fn initial(
+ config: &ClientConfig,
+ client: &'a AsyncBackupClient,
+ ) -> Result<Self, BackupError> {
Ok(Self {
client,
policy: BackupPolicy::default(),
@@ -62,7 +90,7 @@ impl<'a> BackupRun<'a> {
pub fn incremental(
config: &ClientConfig,
- client: &'a BackupClient,
+ client: &'a AsyncBackupClient,
) -> Result<Self, BackupError> {
Ok(Self {
client,
@@ -72,7 +100,7 @@ impl<'a> BackupRun<'a> {
})
}
- pub fn start(
+ pub async fn start(
&mut self,
genid: Option<&GenId>,
oldname: &Path,
@@ -86,7 +114,7 @@ impl<'a> BackupRun<'a> {
Ok(LocalGeneration::open(oldname)?)
}
Some(genid) => {
- let old = self.fetch_previous_generation(genid, oldname)?;
+ let old = self.fetch_previous_generation(genid, oldname).await?;
let progress = BackupProgress::incremental();
progress.files_in_previous_generation(old.file_count()? as u64);
@@ -97,13 +125,13 @@ impl<'a> BackupRun<'a> {
}
}
- fn fetch_previous_generation(
+ async fn fetch_previous_generation(
&self,
genid: &GenId,
oldname: &Path,
) -> Result<LocalGeneration, ObnamError> {
let progress = BackupProgress::download_generation(genid);
- let old = self.client.fetch_generation(genid, oldname)?;
+ let old = self.client.fetch_generation(genid, oldname).await?;
progress.finish();
Ok(old)
}
@@ -114,88 +142,214 @@ impl<'a> BackupRun<'a> {
}
}
- pub fn backup_roots(
+ pub async fn backup_roots(
&self,
config: &ClientConfig,
old: &LocalGeneration,
newpath: &Path,
- ) -> Result<RootsBackupOutcome, NascentError> {
- let mut warnings = vec![];
+ ) -> Result<RootsBackupOutcome, ObnamError> {
+ let mut warnings: Vec<BackupError> = vec![];
let mut new_cachedir_tags = vec![];
let files_count = {
let mut new = NascentGeneration::create(newpath)?;
for root in &config.roots {
- let iter = FsIterator::new(root, config.exclude_cache_tag_directories);
- let entries = iter.map(|entry| {
- if let Ok(ref entry) = entry {
- let path = entry.inner.pathbuf();
- if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? {
- new_cachedir_tags.push(path);
+ match self.backup_one_root(config, old, &mut new, root).await {
+ Ok(mut o) => {
+ new_cachedir_tags.append(&mut o.new_cachedir_tags);
+ if !o.warnings.is_empty() {
+ for err in o.warnings.iter() {
+ debug!("ignoring backup error {}", err);
+ self.found_problem();
+ }
+ warnings.append(&mut o.warnings);
}
- };
- self.backup(entry, old)
- });
- let mut new_warnings = new.insert_iter(entries)?;
- warnings.append(&mut new_warnings);
+ }
+ Err(err) => {
+ debug!("ignoring backup error {}", err);
+ warnings.push(err.into());
+ self.found_problem();
+ }
+ }
}
new.file_count()
};
self.finish();
+ let gen_id = self.upload_nascent_generation(newpath).await?;
+ let gen_id = GenId::from_chunk_id(gen_id);
Ok(RootsBackupOutcome {
files_count,
warnings,
new_cachedir_tags,
+ gen_id,
})
}
- pub fn backup(
+ async fn backup_one_root(
&self,
- entry: Result<AnnotatedFsEntry, FsIterError>,
+ config: &ClientConfig,
+ old: &LocalGeneration,
+ new: &mut NascentGeneration,
+ root: &Path,
+ ) -> Result<OneRootBackupOutcome, NascentError> {
+ let mut warnings: Vec<BackupError> = vec![];
+ let mut new_cachedir_tags = vec![];
+ let iter = FsIterator::new(root, config.exclude_cache_tag_directories);
+ for entry in iter {
+ match entry {
+ Err(err) => {
+ warnings.push(err.into());
+ }
+ Ok(entry) => {
+ let path = entry.inner.pathbuf();
+ if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? {
+ new_cachedir_tags.push(path);
+ }
+ match self.backup_if_needed(entry, old).await {
+ Err(err) => {
+ warnings.push(err);
+ }
+ Ok(o) => {
+ if let Err(err) =
+ new.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag)
+ {
+ warnings.push(err.into());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ Ok(OneRootBackupOutcome {
+ files_count: 0, // Caller will get file count from new.
+ warnings,
+ new_cachedir_tags,
+ })
+ }
+
+ async fn backup_if_needed(
+ &self,
+ entry: AnnotatedFsEntry,
old: &LocalGeneration,
) -> Result<FsEntryBackupOutcome, BackupError> {
- match entry {
- Err(err) => {
- warn!("backup: {}", err);
- self.found_problem();
- Err(BackupError::FsIterError(err))
+ let path = &entry.inner.pathbuf();
+ info!("backup: {}", path.display());
+ self.found_live_file(path);
+ let reason = self.policy.needs_backup(old, &entry.inner);
+ match reason {
+ Reason::IsNew | Reason::Changed | Reason::GenerationLookupError | Reason::Unknown => {
+ Ok(self.backup_one_entry(&entry, path, reason).await)
}
- Ok(entry) => {
- let path = &entry.inner.pathbuf();
- info!("backup: {}", path.display());
- self.found_live_file(path);
- let reason = self.policy.needs_backup(old, &entry.inner);
- match reason {
- Reason::IsNew
- | Reason::Changed
- | Reason::GenerationLookupError
- | Reason::Unknown => Ok(backup_file(
- self.client,
- &entry,
- path,
- self.buffer_size,
- reason,
- )),
- Reason::Unchanged | Reason::Skipped | Reason::FileError => {
- let fileno = old.get_fileno(&entry.inner.pathbuf())?;
- let ids = if let Some(fileno) = fileno {
- let mut ids = vec![];
- for id in old.chunkids(fileno)?.iter()? {
- ids.push(id?);
- }
- ids
- } else {
- vec![]
- };
- Ok(FsEntryBackupOutcome {
- entry: entry.inner,
- ids,
- reason,
- is_cachedir_tag: entry.is_cachedir_tag,
- })
+ Reason::Unchanged | Reason::Skipped | Reason::FileError => {
+ let fileno = old.get_fileno(&entry.inner.pathbuf())?;
+ let ids = if let Some(fileno) = fileno {
+ let mut ids = vec![];
+ for id in old.chunkids(fileno)?.iter()? {
+ ids.push(id?);
}
+ ids
+ } else {
+ vec![]
+ };
+ Ok(FsEntryBackupOutcome {
+ entry: entry.inner,
+ ids,
+ reason,
+ is_cachedir_tag: entry.is_cachedir_tag,
+ })
+ }
+ }
+ }
+
+ async fn backup_one_entry(
+ &self,
+ entry: &AnnotatedFsEntry,
+ path: &Path,
+ reason: Reason,
+ ) -> FsEntryBackupOutcome {
+ let ids = self
+ .upload_filesystem_entry(&entry.inner, self.buffer_size)
+ .await;
+ match ids {
+ Err(err) => {
+ warn!("error backing up {}, skipping it: {}", path.display(), err);
+ FsEntryBackupOutcome {
+ entry: entry.inner.clone(),
+ ids: vec![],
+ reason: Reason::FileError,
+ is_cachedir_tag: entry.is_cachedir_tag,
}
}
+ Ok(ids) => FsEntryBackupOutcome {
+ entry: entry.inner.clone(),
+ ids,
+ reason,
+ is_cachedir_tag: entry.is_cachedir_tag,
+ },
+ }
+ }
+
+ pub async fn upload_filesystem_entry(
+ &self,
+ e: &FilesystemEntry,
+ size: usize,
+ ) -> Result<Vec<ChunkId>, BackupError> {
+ let path = e.pathbuf();
+ info!("uploading {:?}", path);
+ let ids = match e.kind() {
+ FilesystemKind::Regular => self.upload_regular_file(&path, size).await?,
+ FilesystemKind::Directory => vec![],
+ FilesystemKind::Symlink => vec![],
+ FilesystemKind::Socket => vec![],
+ FilesystemKind::Fifo => vec![],
+ };
+ info!("upload OK for {:?}", path);
+ Ok(ids)
+ }
+
+ pub async fn upload_generation(
+ &self,
+ filename: &Path,
+ size: usize,
+ ) -> Result<ChunkId, BackupError> {
+ info!("upload SQLite {}", filename.display());
+ let ids = self.upload_regular_file(filename, size).await?;
+ let gen = GenerationChunk::new(ids);
+ let data = gen.to_data_chunk(&current_timestamp())?;
+ let gen_id = self.client.upload_chunk(data).await?;
+ info!("uploaded generation {}", gen_id);
+ Ok(gen_id)
+ }
+
+ async fn upload_regular_file(
+ &self,
+ filename: &Path,
+ size: usize,
+ ) -> Result<Vec<ChunkId>, BackupError> {
+ info!("upload file {}", filename.display());
+ let mut chunk_ids = vec![];
+ let file = std::fs::File::open(filename)
+ .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?;
+ let chunker = Chunker::new(size, file, filename);
+ for item in chunker {
+ let chunk = item?;
+ if let Some(chunk_id) = self.client.has_chunk(chunk.meta()).await? {
+ chunk_ids.push(chunk_id.clone());
+ info!("reusing existing chunk {}", chunk_id);
+ } else {
+ let chunk_id = self.client.upload_chunk(chunk).await?;
+ chunk_ids.push(chunk_id.clone());
+ info!("created new chunk {}", chunk_id);
+ }
}
+ Ok(chunk_ids)
+ }
+
+ async fn upload_nascent_generation(&self, filename: &Path) -> Result<ChunkId, ObnamError> {
+ let progress = BackupProgress::upload_generation();
+ let gen_id = self.upload_generation(filename, SQLITE_CHUNK_SIZE).await?;
+ progress.finish();
+ Ok(gen_id)
}
fn found_live_file(&self, path: &Path) {
@@ -211,29 +365,7 @@ impl<'a> BackupRun<'a> {
}
}
-fn backup_file(
- client: &BackupClient,
- entry: &AnnotatedFsEntry,
- path: &Path,
- chunk_size: usize,
- reason: Reason,
-) -> FsEntryBackupOutcome {
- let ids = client.upload_filesystem_entry(&entry.inner, chunk_size);
- match ids {
- Err(err) => {
- warn!("error backing up {}, skipping it: {}", path.display(), err);
- FsEntryBackupOutcome {
- entry: entry.inner.clone(),
- ids: vec![],
- reason: Reason::FileError,
- is_cachedir_tag: entry.is_cachedir_tag,
- }
- }
- Ok(ids) => FsEntryBackupOutcome {
- entry: entry.inner.clone(),
- ids,
- reason,
- is_cachedir_tag: entry.is_cachedir_tag,
- },
- }
+fn current_timestamp() -> String {
+ let now: DateTime<Local> = Local::now();
+ format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z"))
}
diff --git a/src/client.rs b/src/client.rs
index c655bb2..5451dfb 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,17 +1,12 @@
-use crate::chunk::DataChunk;
-use crate::chunk::{GenerationChunk, GenerationChunkError};
-use crate::chunker::{Chunker, ChunkerError};
+use crate::chunk::{DataChunk, GenerationChunk, GenerationChunkError};
use crate::chunkid::ChunkId;
use crate::chunkmeta::ChunkMeta;
use crate::cipher::{CipherEngine, CipherError};
use crate::config::{ClientConfig, ClientConfigError};
-use crate::fsentry::{FilesystemEntry, FilesystemKind};
use crate::generation::{FinishedGeneration, GenId, LocalGeneration, LocalGenerationError};
use crate::genlist::GenerationList;
-use chrono::{DateTime, Local};
use log::{debug, error, info};
-use reqwest::blocking::Client;
use reqwest::header::HeaderMap;
use std::collections::HashMap;
use std::fs::File;
@@ -50,9 +45,6 @@ pub enum ClientError {
#[error(transparent)]
LocalGenerationError(#[from] LocalGenerationError),
- #[error(transparent)]
- ChunkerError(#[from] ChunkerError),
-
#[error("couldn't convert response chunk-meta header to string: {0}")]
MetaHeaderToString(reqwest::header::ToStrError),
@@ -92,6 +84,13 @@ impl AsyncBackupClient {
chunk_client: AsyncChunkClient::new(config)?,
})
}
+ pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> {
+ self.chunk_client.has_chunk(meta).await
+ }
+
+ pub async fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> {
+ self.chunk_client.upload_chunk(chunk).await
+ }
pub async fn list_generations(&self) -> Result<GenerationList, ClientError> {
self.chunk_client.list_generations().await
@@ -159,229 +158,8 @@ impl AsyncChunkClient {
format!("{}/chunks", self.base_url())
}
- pub async fn list_generations(&self) -> Result<GenerationList, ClientError> {
- let (_, body) = self.get("", &[("generation", "true")]).await?;
-
- let map: HashMap<String, ChunkMeta> =
- serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?;
- debug!("list_generations: map={:?}", map);
- let finished = map
- .iter()
- .map(|(id, meta)| FinishedGeneration::new(id, meta.ended().map_or("", |s| s)))
- .collect();
- Ok(GenerationList::new(finished))
- }
-
- pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
- let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?;
- let meta = self.get_chunk_meta_header(chunk_id, &headers)?;
-
- let meta_bytes = meta.to_json_vec();
- let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?;
-
- Ok(chunk)
- }
-
- async fn get(
- &self,
- path: &str,
- query: &[(&str, &str)],
- ) -> Result<(HeaderMap, Vec<u8>), ClientError> {
- let url = format!("{}{}", &self.chunks_url(), path);
- info!("GET {}", url);
-
- // Build HTTP request structure.
- let req = self
- .client
- .get(&url)
- .query(query)
- .build()
- .map_err(ClientError::ReqwestError)?;
-
- // Make HTTP request.
- let res = self
- .client
- .execute(req)
- .await
- .map_err(ClientError::ReqwestError)?;
-
- // Did it work?
- if res.status() != 200 {
- return Err(ClientError::NotFound(path.to_string()));
- }
-
- // Return headers and body.
- let headers = res.headers().clone();
- let body = res.bytes().await.map_err(ClientError::ReqwestError)?;
- let body = body.to_vec();
- Ok((headers, body))
- }
-
- fn get_chunk_meta_header(
- &self,
- chunk_id: &ChunkId,
- headers: &HeaderMap,
- ) -> Result<ChunkMeta, ClientError> {
- let meta = headers.get("chunk-meta");
-
- if meta.is_none() {
- let err = ClientError::NoChunkMeta(chunk_id.clone());
- error!("fetching chunk {} failed: {}", chunk_id, err);
- return Err(err);
- }
-
- let meta = meta
- .unwrap()
- .to_str()
- .map_err(ClientError::MetaHeaderToString)?;
- let meta: ChunkMeta = serde_json::from_str(meta).map_err(ClientError::JsonParse)?;
-
- Ok(meta)
- }
-}
-
-pub struct BackupClient {
- chunk_client: ChunkClient,
-}
-
-impl BackupClient {
- pub fn new(config: &ClientConfig) -> Result<Self, ClientError> {
- info!("creating backup client with config: {:#?}", config);
- Ok(Self {
- chunk_client: ChunkClient::new(config)?,
- })
- }
-
- pub fn upload_filesystem_entry(
- &self,
- e: &FilesystemEntry,
- size: usize,
- ) -> Result<Vec<ChunkId>, ClientError> {
- let path = e.pathbuf();
- info!("uploading {:?}", path);
- let ids = match e.kind() {
- FilesystemKind::Regular => self.read_file(&path, size)?,
- FilesystemKind::Directory => vec![],
- FilesystemKind::Symlink => vec![],
- FilesystemKind::Socket => vec![],
- FilesystemKind::Fifo => vec![],
- };
- info!("upload OK for {:?}", path);
- Ok(ids)
- }
-
- pub fn upload_generation(&self, filename: &Path, size: usize) -> Result<ChunkId, ClientError> {
- info!("upload SQLite {}", filename.display());
- let ids = self.read_file(filename, size)?;
- let gen = GenerationChunk::new(ids);
- let data = gen.to_data_chunk(&current_timestamp())?;
- let gen_id = self.upload_chunk(data)?;
- info!("uploaded generation {}", gen_id);
- Ok(gen_id)
- }
-
- fn read_file(&self, filename: &Path, size: usize) -> Result<Vec<ChunkId>, ClientError> {
- info!("upload file {}", filename.display());
- let file = std::fs::File::open(filename)
- .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?;
- let chunker = Chunker::new(size, file, filename);
- let chunk_ids = self.upload_new_file_chunks(chunker)?;
- Ok(chunk_ids)
- }
-
- pub fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> {
- self.chunk_client.has_chunk(meta)
- }
-
- pub fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> {
- self.chunk_client.upload_chunk(chunk)
- }
-
- pub fn upload_new_file_chunks(&self, chunker: Chunker) -> Result<Vec<ChunkId>, ClientError> {
- let mut chunk_ids = vec![];
- for item in chunker {
- let chunk = item?;
- if let Some(chunk_id) = self.has_chunk(chunk.meta())? {
- chunk_ids.push(chunk_id.clone());
- info!("reusing existing chunk {}", chunk_id);
- } else {
- let chunk_id = self.upload_chunk(chunk)?;
- chunk_ids.push(chunk_id.clone());
- info!("created new chunk {}", chunk_id);
- }
- }
-
- Ok(chunk_ids)
- }
-
- pub fn list_generations(&self) -> Result<GenerationList, ClientError> {
- self.chunk_client.list_generations()
- }
-
- pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
- self.chunk_client.fetch_chunk(chunk_id)
- }
-
- fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result<GenerationChunk, ClientError> {
- let chunk = self.fetch_chunk(gen_id.as_chunk_id())?;
- let gen = GenerationChunk::from_data_chunk(&chunk)?;
- Ok(gen)
- }
-
- pub fn fetch_generation(
- &self,
- gen_id: &GenId,
- dbname: &Path,
- ) -> Result<LocalGeneration, ClientError> {
- let gen = self.fetch_generation_chunk(gen_id)?;
-
- // Fetch the SQLite file, storing it in the named file.
- let mut dbfile = File::create(&dbname)
- .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?;
- for id in gen.chunk_ids() {
- let chunk = self.fetch_chunk(id)?;
- dbfile
- .write_all(chunk.data())
- .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?;
- }
- info!("downloaded generation to {}", dbname.display());
-
- let gen = LocalGeneration::open(dbname)?;
- Ok(gen)
- }
-}
-
-pub struct ChunkClient {
- client: Client,
- base_url: String,
- cipher: CipherEngine,
-}
-
-impl ChunkClient {
- pub fn new(config: &ClientConfig) -> Result<Self, ClientError> {
- let pass = config.passwords()?;
-
- let client = Client::builder()
- .danger_accept_invalid_certs(!config.verify_tls_cert)
- .build()
- .map_err(ClientError::ReqwestError)?;
- Ok(Self {
- client,
- base_url: config.server_url.to_string(),
- cipher: CipherEngine::new(&pass),
- })
- }
-
- fn base_url(&self) -> &str {
- &self.base_url
- }
-
- fn chunks_url(&self) -> String {
- format!("{}/chunks", self.base_url())
- }
-
- pub fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> {
- let body = match self.get("", &[("sha256", meta.sha256())]) {
+ pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> {
+ let body = match self.get("", &[("sha256", meta.sha256())]).await {
Ok((_, body)) => body,
Err(err) => return Err(err),
};
@@ -398,7 +176,7 @@ impl ChunkClient {
Ok(has)
}
- pub fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> {
+ pub async fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> {
let enc = self.cipher.encrypt_chunk(&chunk)?;
let res = self
.client
@@ -406,9 +184,10 @@ impl ChunkClient {
.header("chunk-meta", chunk.meta().to_json())
.body(enc.ciphertext().to_vec())
.send()
+ .await
.map_err(ClientError::ReqwestError)?;
debug!("upload_chunk: res={:?}", res);
- let res: HashMap<String, String> = res.json().map_err(ClientError::ReqwestError)?;
+ let res: HashMap<String, String> = res.json().await.map_err(ClientError::ReqwestError)?;
let chunk_id = if let Some(chunk_id) = res.get("chunk_id") {
debug!("upload_chunk: id={}", chunk_id);
chunk_id.parse().unwrap()
@@ -419,8 +198,8 @@ impl ChunkClient {
Ok(chunk_id)
}
- pub fn list_generations(&self) -> Result<GenerationList, ClientError> {
- let (_, body) = self.get("", &[("generation", "true")])?;
+ pub async fn list_generations(&self) -> Result<GenerationList, ClientError> {
+ let (_, body) = self.get("", &[("generation", "true")]).await?;
let map: HashMap<String, ChunkMeta> =
serde_yaml::from_slice(&body).map_err(ClientError::YamlParse)?;
@@ -432,8 +211,8 @@ impl ChunkClient {
Ok(GenerationList::new(finished))
}
- pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
- let (headers, body) = self.get(&format!("/{}", chunk_id), &[])?;
+ pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
+ let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?;
let meta = self.get_chunk_meta_header(chunk_id, &headers)?;
let meta_bytes = meta.to_json_vec();
@@ -442,7 +221,11 @@ impl ChunkClient {
Ok(chunk)
}
- fn get(&self, path: &str, query: &[(&str, &str)]) -> Result<(HeaderMap, Vec<u8>), ClientError> {
+ async fn get(
+ &self,
+ path: &str,
+ query: &[(&str, &str)],
+ ) -> Result<(HeaderMap, Vec<u8>), ClientError> {
let url = format!("{}{}", &self.chunks_url(), path);
info!("GET {}", url);
@@ -458,6 +241,7 @@ impl ChunkClient {
let res = self
.client
.execute(req)
+ .await
.map_err(ClientError::ReqwestError)?;
// Did it work?
@@ -467,7 +251,7 @@ impl ChunkClient {
// Return headers and body.
let headers = res.headers().clone();
- let body = res.bytes().map_err(ClientError::ReqwestError)?;
+ let body = res.bytes().await.map_err(ClientError::ReqwestError)?;
let body = body.to_vec();
Ok((headers, body))
}
@@ -494,8 +278,3 @@ impl ChunkClient {
Ok(meta)
}
}
-
-fn current_timestamp() -> String {
- let now: DateTime<Local> = Local::now();
- format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z"))
-}
diff --git a/src/cmd/backup.rs b/src/cmd/backup.rs
index 04dfb05..8f3d6d5 100644
--- a/src/cmd/backup.rs
+++ b/src/cmd/backup.rs
@@ -1,27 +1,29 @@
-use crate::backup_progress::BackupProgress;
use crate::backup_run::BackupRun;
-use crate::chunkid::ChunkId;
-use crate::client::BackupClient;
+use crate::client::AsyncBackupClient;
use crate::config::ClientConfig;
use crate::error::ObnamError;
-use bytesize::MIB;
+use crate::generation::GenId;
+
use log::info;
-use std::path::Path;
use std::time::SystemTime;
use structopt::StructOpt;
use tempfile::NamedTempFile;
-
-const SQLITE_CHUNK_SIZE: usize = MIB as usize;
+use tokio::runtime::Runtime;
#[derive(Debug, StructOpt)]
pub struct Backup {}
impl Backup {
pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
+
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
let runtime = SystemTime::now();
- let client = BackupClient::new(config)?;
- let genlist = client.list_generations()?;
+ let client = AsyncBackupClient::new(config)?;
+ let genlist = client.list_generations().await?;
let oldtemp = NamedTempFile::new()?;
let newtemp = NamedTempFile::new()?;
@@ -30,19 +32,17 @@ impl Backup {
Err(_) => {
info!("fresh backup without a previous generation");
let mut run = BackupRun::initial(config, &client)?;
- let old = run.start(None, oldtemp.path())?;
- (false, run.backup_roots(config, &old, newtemp.path())?)
+ let old = run.start(None, oldtemp.path()).await?;
+ (false, run.backup_roots(config, &old, newtemp.path()).await?)
}
Ok(old_id) => {
info!("incremental backup based on {}", old_id);
let mut run = BackupRun::incremental(config, &client)?;
- let old = run.start(Some(&old_id), oldtemp.path())?;
- (true, run.backup_roots(config, &old, newtemp.path())?)
+ let old = run.start(Some(&old_id), oldtemp.path()).await?;
+ (true, run.backup_roots(config, &old, newtemp.path()).await?)
}
};
- let gen_id = upload_nascent_generation(&client, newtemp.path())?;
-
for w in outcome.warnings.iter() {
println!("warning: {}", w);
}
@@ -58,7 +58,7 @@ impl Backup {
report_stats(
&runtime,
outcome.files_count,
- &gen_id,
+ &outcome.gen_id,
outcome.warnings.len(),
)?;
@@ -73,7 +73,7 @@ impl Backup {
fn report_stats(
runtime: &SystemTime,
file_count: i64,
- gen_id: &ChunkId,
+ gen_id: &GenId,
num_warnings: usize,
) -> Result<(), ObnamError> {
println!("status: OK");
@@ -83,13 +83,3 @@ fn report_stats(
println!("generation-id: {}", gen_id);
Ok(())
}
-
-fn upload_nascent_generation(
- client: &BackupClient,
- filename: &Path,
-) -> Result<ChunkId, ObnamError> {
- let progress = BackupProgress::upload_generation();
- let gen_id = client.upload_generation(filename, SQLITE_CHUNK_SIZE)?;
- progress.finish();
- Ok(gen_id)
-}
diff --git a/src/generation.rs b/src/generation.rs
index 5412ae7..bd36a19 100644
--- a/src/generation.rs
+++ b/src/generation.rs
@@ -1,8 +1,6 @@
use crate::backup_reason::Reason;
-use crate::backup_run::{BackupError, FsEntryBackupOutcome};
use crate::chunkid::ChunkId;
use crate::fsentry::FilesystemEntry;
-use log::debug;
use rusqlite::Connection;
use std::fmt;
use std::path::{Path, PathBuf};
@@ -50,9 +48,6 @@ pub enum NascentError {
#[error(transparent)]
LocalGenerationError(#[from] LocalGenerationError),
- #[error(transparent)]
- BackupError(#[from] BackupError),
-
#[error("SQL transaction error: {0}")]
Transaction(rusqlite::Error),
@@ -89,33 +84,6 @@ impl NascentGeneration {
t.commit().map_err(NascentError::Commit)?;
Ok(())
}
-
- pub fn insert_iter(
- &mut self,
- entries: impl Iterator<Item = Result<FsEntryBackupOutcome, BackupError>>,
- ) -> Result<Vec<BackupError>, NascentError> {
- let t = self.conn.transaction().map_err(NascentError::Transaction)?;
- let mut warnings = vec![];
- for r in entries {
- match r {
- Err(err) => {
- debug!("ignoring backup error {}", err);
- warnings.push(err);
- }
- Ok(FsEntryBackupOutcome {
- entry,
- ids,
- reason,
- is_cachedir_tag,
- }) => {
- self.fileno += 1;
- sql::insert_one(&t, entry, self.fileno, &ids[..], reason, is_cachedir_tag)?;
- }
- }
- }
- t.commit().map_err(NascentError::Commit)?;
- Ok(warnings)
- }
}
/// A finished generation.
@@ -479,6 +447,9 @@ mod test {
assert!(filename.exists());
}
+ // FIXME: This is way too complicated a test function. It should
+ // be simplified, possibly by re-thinking the abstractions of the
+ // code it calls.
#[test]
fn remembers_cachedir_tags() {
use crate::{
@@ -516,20 +487,24 @@ mod test {
.unwrap();
let entries = vec![
- Ok(FsEntryBackupOutcome {
+ FsEntryBackupOutcome {
entry: FilesystemEntry::from_metadata(nontag_path2, &metadata).unwrap(),
ids: vec![],
reason: Reason::IsNew,
is_cachedir_tag: false,
- }),
- Ok(FsEntryBackupOutcome {
+ },
+ FsEntryBackupOutcome {
entry: FilesystemEntry::from_metadata(tag_path2, &metadata).unwrap(),
ids: vec![],
reason: Reason::IsNew,
is_cachedir_tag: true,
- }),
+ },
];
- gen.insert_iter(entries.into_iter()).unwrap();
+
+ for o in entries {
+ gen.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag)
+ .unwrap();
+ }
drop(gen);