summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2021-08-10 10:42:32 +0300
committerLars Wirzenius <liw@liw.fi>2021-08-16 13:18:07 +0300
commit3adfd67831f0e197b8b2e23c79393056584f398e (patch)
treeafcbc8b48397ef4a8018c9c739ad567871586b6b
parent67757758f0226c31b667a329be6ab25008ef372a (diff)
downloadobnam2-3adfd67831f0e197b8b2e23c79393056584f398e.tar.gz
refactor: move file reading, etc, for backups to backup_run
Move code to read a file as chunks during a backup, and upload any new chunks to the chunk server, into `src/backup_run.rs`. Previously they were mostly in `src/client.rs`, which is meant to provide an abstraction for using the chunk server API. Sponsored-by: author
-rw-r--r--src/backup_run.rs109
-rw-r--r--src/client.rs76
-rw-r--r--src/cmd/backup.rs26
3 files changed, 108 insertions, 103 deletions
diff --git a/src/backup_run.rs b/src/backup_run.rs
index 02d20ee..1bb0bc0 100644
--- a/src/backup_run.rs
+++ b/src/backup_run.rs
@@ -1,18 +1,25 @@
use crate::backup_progress::BackupProgress;
use crate::backup_reason::Reason;
+use crate::chunk::{GenerationChunk, GenerationChunkError};
+use crate::chunker::{Chunker, ChunkerError};
use crate::chunkid::ChunkId;
use crate::client::{AsyncBackupClient, ClientError};
use crate::config::ClientConfig;
use crate::error::ObnamError;
-use crate::fsentry::FilesystemEntry;
+use crate::fsentry::{FilesystemEntry, FilesystemKind};
use crate::fsiter::{AnnotatedFsEntry, FsIterError, FsIterator};
use crate::generation::{
GenId, LocalGeneration, LocalGenerationError, NascentError, NascentGeneration,
};
use crate::policy::BackupPolicy;
-use log::{debug, info, warn};
+
+use bytesize::MIB;
+use chrono::{DateTime, Local};
+use log::{debug, error, info, warn};
use std::path::{Path, PathBuf};
+const SQLITE_CHUNK_SIZE: usize = MIB as usize;
+
pub struct BackupRun<'a> {
client: &'a AsyncBackupClient,
policy: BackupPolicy,
@@ -33,6 +40,12 @@ pub enum BackupError {
#[error(transparent)]
LocalGenerationError(#[from] LocalGenerationError),
+
+ #[error(transparent)]
+ ChunkerError(#[from] ChunkerError),
+
+ #[error(transparent)]
+ GenerationChunkError(#[from] GenerationChunkError),
}
#[derive(Debug)]
@@ -44,6 +57,13 @@ pub struct FsEntryBackupOutcome {
}
#[derive(Debug)]
+struct OneRootBackupOutcome {
+ pub files_count: i64,
+ pub warnings: Vec<BackupError>,
+ pub new_cachedir_tags: Vec<PathBuf>,
+}
+
+#[derive(Debug)]
pub struct RootsBackupOutcome {
/// The number of backed up files.
pub files_count: i64,
@@ -51,6 +71,8 @@ pub struct RootsBackupOutcome {
pub warnings: Vec<BackupError>,
/// CACHEDIR.TAG files that aren't present in in a previous generation.
pub new_cachedir_tags: Vec<PathBuf>,
+ /// Id of new generation.
+ pub gen_id: GenId,
}
impl<'a> BackupRun<'a> {
@@ -125,7 +147,7 @@ impl<'a> BackupRun<'a> {
config: &ClientConfig,
old: &LocalGeneration,
newpath: &Path,
- ) -> Result<RootsBackupOutcome, NascentError> {
+ ) -> Result<RootsBackupOutcome, ObnamError> {
let mut warnings: Vec<BackupError> = vec![];
let mut new_cachedir_tags = vec![];
let files_count = {
@@ -152,10 +174,13 @@ impl<'a> BackupRun<'a> {
new.file_count()
};
self.finish();
+ let gen_id = self.upload_nascent_generation(newpath).await?;
+ let gen_id = GenId::from_chunk_id(gen_id);
Ok(RootsBackupOutcome {
files_count,
warnings,
new_cachedir_tags,
+ gen_id,
})
}
@@ -165,7 +190,7 @@ impl<'a> BackupRun<'a> {
old: &LocalGeneration,
new: &mut NascentGeneration,
root: &Path,
- ) -> Result<RootsBackupOutcome, NascentError> {
+ ) -> Result<OneRootBackupOutcome, NascentError> {
let mut warnings: Vec<BackupError> = vec![];
let mut new_cachedir_tags = vec![];
let iter = FsIterator::new(root, config.exclude_cache_tag_directories);
@@ -195,7 +220,7 @@ impl<'a> BackupRun<'a> {
}
}
- Ok(RootsBackupOutcome {
+ Ok(OneRootBackupOutcome {
files_count: 0, // Caller will get file count from new.
warnings,
new_cachedir_tags,
@@ -243,7 +268,6 @@ impl<'a> BackupRun<'a> {
reason: Reason,
) -> FsEntryBackupOutcome {
let ids = self
- .client
.upload_filesystem_entry(&entry.inner, self.buffer_size)
.await;
match ids {
@@ -265,6 +289,74 @@ impl<'a> BackupRun<'a> {
}
}
+ pub async fn upload_filesystem_entry(
+ &self,
+ e: &FilesystemEntry,
+ size: usize,
+ ) -> Result<Vec<ChunkId>, BackupError> {
+ let path = e.pathbuf();
+ info!("uploading {:?}", path);
+ let ids = match e.kind() {
+ FilesystemKind::Regular => self.read_file(&path, size).await?,
+ FilesystemKind::Directory => vec![],
+ FilesystemKind::Symlink => vec![],
+ FilesystemKind::Socket => vec![],
+ FilesystemKind::Fifo => vec![],
+ };
+ info!("upload OK for {:?}", path);
+ Ok(ids)
+ }
+
+ pub async fn upload_generation(
+ &self,
+ filename: &Path,
+ size: usize,
+ ) -> Result<ChunkId, BackupError> {
+ info!("upload SQLite {}", filename.display());
+ let ids = self.read_file(filename, size).await?;
+ let gen = GenerationChunk::new(ids);
+ let data = gen.to_data_chunk(&current_timestamp())?;
+ let gen_id = self.client.upload_chunk(data).await?;
+ info!("uploaded generation {}", gen_id);
+ Ok(gen_id)
+ }
+
+ async fn read_file(&self, filename: &Path, size: usize) -> Result<Vec<ChunkId>, BackupError> {
+ info!("upload file {}", filename.display());
+ let file = std::fs::File::open(filename)
+ .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?;
+ let chunker = Chunker::new(size, file, filename);
+ let chunk_ids = self.upload_new_file_chunks(chunker).await?;
+ Ok(chunk_ids)
+ }
+
+ pub async fn upload_new_file_chunks(
+ &self,
+ chunker: Chunker,
+ ) -> Result<Vec<ChunkId>, BackupError> {
+ let mut chunk_ids = vec![];
+ for item in chunker {
+ let chunk = item?;
+ if let Some(chunk_id) = self.client.has_chunk(chunk.meta()).await? {
+ chunk_ids.push(chunk_id.clone());
+ info!("reusing existing chunk {}", chunk_id);
+ } else {
+ let chunk_id = self.client.upload_chunk(chunk).await?;
+ chunk_ids.push(chunk_id.clone());
+ info!("created new chunk {}", chunk_id);
+ }
+ }
+
+ Ok(chunk_ids)
+ }
+
+ async fn upload_nascent_generation(&self, filename: &Path) -> Result<ChunkId, ObnamError> {
+ let progress = BackupProgress::upload_generation();
+ let gen_id = self.upload_generation(filename, SQLITE_CHUNK_SIZE).await?;
+ progress.finish();
+ Ok(gen_id)
+ }
+
fn found_live_file(&self, path: &Path) {
if let Some(progress) = &self.progress {
progress.found_live_file(path);
@@ -277,3 +369,8 @@ impl<'a> BackupRun<'a> {
}
}
}
+
+fn current_timestamp() -> String {
+ let now: DateTime<Local> = Local::now();
+ format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z"))
+}
diff --git a/src/client.rs b/src/client.rs
index 9fddc18..5451dfb 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,15 +1,11 @@
-use crate::chunk::DataChunk;
-use crate::chunk::{GenerationChunk, GenerationChunkError};
-use crate::chunker::{Chunker, ChunkerError};
+use crate::chunk::{DataChunk, GenerationChunk, GenerationChunkError};
use crate::chunkid::ChunkId;
use crate::chunkmeta::ChunkMeta;
use crate::cipher::{CipherEngine, CipherError};
use crate::config::{ClientConfig, ClientConfigError};
-use crate::fsentry::{FilesystemEntry, FilesystemKind};
use crate::generation::{FinishedGeneration, GenId, LocalGeneration, LocalGenerationError};
use crate::genlist::GenerationList;
-use chrono::{DateTime, Local};
use log::{debug, error, info};
use reqwest::header::HeaderMap;
use std::collections::HashMap;
@@ -49,9 +45,6 @@ pub enum ClientError {
#[error(transparent)]
LocalGenerationError(#[from] LocalGenerationError),
- #[error(transparent)]
- ChunkerError(#[from] ChunkerError),
-
#[error("couldn't convert response chunk-meta header to string: {0}")]
MetaHeaderToString(reqwest::header::ToStrError),
@@ -91,48 +84,6 @@ impl AsyncBackupClient {
chunk_client: AsyncChunkClient::new(config)?,
})
}
-
- pub async fn upload_filesystem_entry(
- &self,
- e: &FilesystemEntry,
- size: usize,
- ) -> Result<Vec<ChunkId>, ClientError> {
- let path = e.pathbuf();
- info!("uploading {:?}", path);
- let ids = match e.kind() {
- FilesystemKind::Regular => self.read_file(&path, size).await?,
- FilesystemKind::Directory => vec![],
- FilesystemKind::Symlink => vec![],
- FilesystemKind::Socket => vec![],
- FilesystemKind::Fifo => vec![],
- };
- info!("upload OK for {:?}", path);
- Ok(ids)
- }
-
- pub async fn upload_generation(
- &self,
- filename: &Path,
- size: usize,
- ) -> Result<ChunkId, ClientError> {
- info!("upload SQLite {}", filename.display());
- let ids = self.read_file(filename, size).await?;
- let gen = GenerationChunk::new(ids);
- let data = gen.to_data_chunk(&current_timestamp())?;
- let gen_id = self.upload_chunk(data).await?;
- info!("uploaded generation {}", gen_id);
- Ok(gen_id)
- }
-
- async fn read_file(&self, filename: &Path, size: usize) -> Result<Vec<ChunkId>, ClientError> {
- info!("upload file {}", filename.display());
- let file = std::fs::File::open(filename)
- .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?;
- let chunker = Chunker::new(size, file, filename);
- let chunk_ids = self.upload_new_file_chunks(chunker).await?;
- Ok(chunk_ids)
- }
-
pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> {
self.chunk_client.has_chunk(meta).await
}
@@ -141,26 +92,6 @@ impl AsyncBackupClient {
self.chunk_client.upload_chunk(chunk).await
}
- pub async fn upload_new_file_chunks(
- &self,
- chunker: Chunker,
- ) -> Result<Vec<ChunkId>, ClientError> {
- let mut chunk_ids = vec![];
- for item in chunker {
- let chunk = item?;
- if let Some(chunk_id) = self.has_chunk(chunk.meta()).await? {
- chunk_ids.push(chunk_id.clone());
- info!("reusing existing chunk {}", chunk_id);
- } else {
- let chunk_id = self.upload_chunk(chunk).await?;
- chunk_ids.push(chunk_id.clone());
- info!("created new chunk {}", chunk_id);
- }
- }
-
- Ok(chunk_ids)
- }
-
pub async fn list_generations(&self) -> Result<GenerationList, ClientError> {
self.chunk_client.list_generations().await
}
@@ -347,8 +278,3 @@ impl AsyncChunkClient {
Ok(meta)
}
}
-
-fn current_timestamp() -> String {
- let now: DateTime<Local> = Local::now();
- format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z"))
-}
diff --git a/src/cmd/backup.rs b/src/cmd/backup.rs
index dc03ddf..8f3d6d5 100644
--- a/src/cmd/backup.rs
+++ b/src/cmd/backup.rs
@@ -1,19 +1,15 @@
-use crate::backup_progress::BackupProgress;
use crate::backup_run::BackupRun;
-use crate::chunkid::ChunkId;
use crate::client::AsyncBackupClient;
use crate::config::ClientConfig;
use crate::error::ObnamError;
-use bytesize::MIB;
+use crate::generation::GenId;
+
use log::info;
-use std::path::Path;
use std::time::SystemTime;
use structopt::StructOpt;
use tempfile::NamedTempFile;
use tokio::runtime::Runtime;
-const SQLITE_CHUNK_SIZE: usize = MIB as usize;
-
#[derive(Debug, StructOpt)]
pub struct Backup {}
@@ -47,8 +43,6 @@ impl Backup {
}
};
- let gen_id = upload_nascent_generation(&client, newtemp.path()).await?;
-
for w in outcome.warnings.iter() {
println!("warning: {}", w);
}
@@ -64,7 +58,7 @@ impl Backup {
report_stats(
&runtime,
outcome.files_count,
- &gen_id,
+ &outcome.gen_id,
outcome.warnings.len(),
)?;
@@ -79,7 +73,7 @@ impl Backup {
fn report_stats(
runtime: &SystemTime,
file_count: i64,
- gen_id: &ChunkId,
+ gen_id: &GenId,
num_warnings: usize,
) -> Result<(), ObnamError> {
println!("status: OK");
@@ -89,15 +83,3 @@ fn report_stats(
println!("generation-id: {}", gen_id);
Ok(())
}
-
-async fn upload_nascent_generation(
- client: &AsyncBackupClient,
- filename: &Path,
-) -> Result<ChunkId, ObnamError> {
- let progress = BackupProgress::upload_generation();
- let gen_id = client
- .upload_generation(filename, SQLITE_CHUNK_SIZE)
- .await?;
- progress.finish();
- Ok(gen_id)
-}