From 3adfd67831f0e197b8b2e23c79393056584f398e Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Tue, 10 Aug 2021 10:42:32 +0300 Subject: refactor: move file reading, etc, for backups to backup_run Move code to read a file as chunks during a backup, and upload any new chunks to the chunk server, into `src/backup_run.rs`. Previously they were mostly in `src/client.rs`, which is meant to provide an abstraction for using the chunk server API. Sponsored-by: author --- src/backup_run.rs | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++--- src/client.rs | 76 +------------------------------------ src/cmd/backup.rs | 26 ++----------- 3 files changed, 108 insertions(+), 103 deletions(-) diff --git a/src/backup_run.rs b/src/backup_run.rs index 02d20ee..1bb0bc0 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -1,18 +1,25 @@ use crate::backup_progress::BackupProgress; use crate::backup_reason::Reason; +use crate::chunk::{GenerationChunk, GenerationChunkError}; +use crate::chunker::{Chunker, ChunkerError}; use crate::chunkid::ChunkId; use crate::client::{AsyncBackupClient, ClientError}; use crate::config::ClientConfig; use crate::error::ObnamError; -use crate::fsentry::FilesystemEntry; +use crate::fsentry::{FilesystemEntry, FilesystemKind}; use crate::fsiter::{AnnotatedFsEntry, FsIterError, FsIterator}; use crate::generation::{ GenId, LocalGeneration, LocalGenerationError, NascentError, NascentGeneration, }; use crate::policy::BackupPolicy; -use log::{debug, info, warn}; + +use bytesize::MIB; +use chrono::{DateTime, Local}; +use log::{debug, error, info, warn}; use std::path::{Path, PathBuf}; +const SQLITE_CHUNK_SIZE: usize = MIB as usize; + pub struct BackupRun<'a> { client: &'a AsyncBackupClient, policy: BackupPolicy, @@ -33,6 +40,12 @@ pub enum BackupError { #[error(transparent)] LocalGenerationError(#[from] LocalGenerationError), + + #[error(transparent)] + ChunkerError(#[from] ChunkerError), + + #[error(transparent)] + GenerationChunkError(#[from] GenerationChunkError), } #[derive(Debug)] @@ -43,6 +56,13 @@ pub struct FsEntryBackupOutcome { pub is_cachedir_tag: bool, } +#[derive(Debug)] +struct OneRootBackupOutcome { + pub files_count: i64, + pub warnings: Vec, + pub new_cachedir_tags: Vec, +} + #[derive(Debug)] pub struct RootsBackupOutcome { /// The number of backed up files. @@ -51,6 +71,8 @@ pub struct RootsBackupOutcome { pub warnings: Vec, /// CACHEDIR.TAG files that aren't present in in a previous generation. pub new_cachedir_tags: Vec, + /// Id of new generation. + pub gen_id: GenId, } impl<'a> BackupRun<'a> { @@ -125,7 +147,7 @@ impl<'a> BackupRun<'a> { config: &ClientConfig, old: &LocalGeneration, newpath: &Path, - ) -> Result { + ) -> Result { let mut warnings: Vec = vec![]; let mut new_cachedir_tags = vec![]; let files_count = { @@ -152,10 +174,13 @@ impl<'a> BackupRun<'a> { new.file_count() }; self.finish(); + let gen_id = self.upload_nascent_generation(newpath).await?; + let gen_id = GenId::from_chunk_id(gen_id); Ok(RootsBackupOutcome { files_count, warnings, new_cachedir_tags, + gen_id, }) } @@ -165,7 +190,7 @@ impl<'a> BackupRun<'a> { old: &LocalGeneration, new: &mut NascentGeneration, root: &Path, - ) -> Result { + ) -> Result { let mut warnings: Vec = vec![]; let mut new_cachedir_tags = vec![]; let iter = FsIterator::new(root, config.exclude_cache_tag_directories); @@ -195,7 +220,7 @@ impl<'a> BackupRun<'a> { } } - Ok(RootsBackupOutcome { + Ok(OneRootBackupOutcome { files_count: 0, // Caller will get file count from new. warnings, new_cachedir_tags, @@ -243,7 +268,6 @@ impl<'a> BackupRun<'a> { reason: Reason, ) -> FsEntryBackupOutcome { let ids = self - .client .upload_filesystem_entry(&entry.inner, self.buffer_size) .await; match ids { @@ -265,6 +289,74 @@ impl<'a> BackupRun<'a> { } } + pub async fn upload_filesystem_entry( + &self, + e: &FilesystemEntry, + size: usize, + ) -> Result, BackupError> { + let path = e.pathbuf(); + info!("uploading {:?}", path); + let ids = match e.kind() { + FilesystemKind::Regular => self.read_file(&path, size).await?, + FilesystemKind::Directory => vec![], + FilesystemKind::Symlink => vec![], + FilesystemKind::Socket => vec![], + FilesystemKind::Fifo => vec![], + }; + info!("upload OK for {:?}", path); + Ok(ids) + } + + pub async fn upload_generation( + &self, + filename: &Path, + size: usize, + ) -> Result { + info!("upload SQLite {}", filename.display()); + let ids = self.read_file(filename, size).await?; + let gen = GenerationChunk::new(ids); + let data = gen.to_data_chunk(¤t_timestamp())?; + let gen_id = self.client.upload_chunk(data).await?; + info!("uploaded generation {}", gen_id); + Ok(gen_id) + } + + async fn read_file(&self, filename: &Path, size: usize) -> Result, BackupError> { + info!("upload file {}", filename.display()); + let file = std::fs::File::open(filename) + .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?; + let chunker = Chunker::new(size, file, filename); + let chunk_ids = self.upload_new_file_chunks(chunker).await?; + Ok(chunk_ids) + } + + pub async fn upload_new_file_chunks( + &self, + chunker: Chunker, + ) -> Result, BackupError> { + let mut chunk_ids = vec![]; + for item in chunker { + let chunk = item?; + if let Some(chunk_id) = self.client.has_chunk(chunk.meta()).await? { + chunk_ids.push(chunk_id.clone()); + info!("reusing existing chunk {}", chunk_id); + } else { + let chunk_id = self.client.upload_chunk(chunk).await?; + chunk_ids.push(chunk_id.clone()); + info!("created new chunk {}", chunk_id); + } + } + + Ok(chunk_ids) + } + + async fn upload_nascent_generation(&self, filename: &Path) -> Result { + let progress = BackupProgress::upload_generation(); + let gen_id = self.upload_generation(filename, SQLITE_CHUNK_SIZE).await?; + progress.finish(); + Ok(gen_id) + } + fn found_live_file(&self, path: &Path) { if let Some(progress) = &self.progress { progress.found_live_file(path); @@ -277,3 +369,8 @@ impl<'a> BackupRun<'a> { } } } + +fn current_timestamp() -> String { + let now: DateTime = Local::now(); + format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z")) +} diff --git a/src/client.rs b/src/client.rs index 9fddc18..5451dfb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,15 +1,11 @@ -use crate::chunk::DataChunk; -use crate::chunk::{GenerationChunk, GenerationChunkError}; -use crate::chunker::{Chunker, ChunkerError}; +use crate::chunk::{DataChunk, GenerationChunk, GenerationChunkError}; use crate::chunkid::ChunkId; use crate::chunkmeta::ChunkMeta; use crate::cipher::{CipherEngine, CipherError}; use crate::config::{ClientConfig, ClientConfigError}; -use crate::fsentry::{FilesystemEntry, FilesystemKind}; use crate::generation::{FinishedGeneration, GenId, LocalGeneration, LocalGenerationError}; use crate::genlist::GenerationList; -use chrono::{DateTime, Local}; use log::{debug, error, info}; use reqwest::header::HeaderMap; use std::collections::HashMap; @@ -49,9 +45,6 @@ pub enum ClientError { #[error(transparent)] LocalGenerationError(#[from] LocalGenerationError), - #[error(transparent)] - ChunkerError(#[from] ChunkerError), - #[error("couldn't convert response chunk-meta header to string: {0}")] MetaHeaderToString(reqwest::header::ToStrError), @@ -91,48 +84,6 @@ impl AsyncBackupClient { chunk_client: AsyncChunkClient::new(config)?, }) } - - pub async fn upload_filesystem_entry( - &self, - e: &FilesystemEntry, - size: usize, - ) -> Result, ClientError> { - let path = e.pathbuf(); - info!("uploading {:?}", path); - let ids = match e.kind() { - FilesystemKind::Regular => self.read_file(&path, size).await?, - FilesystemKind::Directory => vec![], - FilesystemKind::Symlink => vec![], - FilesystemKind::Socket => vec![], - FilesystemKind::Fifo => vec![], - }; - info!("upload OK for {:?}", path); - Ok(ids) - } - - pub async fn upload_generation( - &self, - filename: &Path, - size: usize, - ) -> Result { - info!("upload SQLite {}", filename.display()); - let ids = self.read_file(filename, size).await?; - let gen = GenerationChunk::new(ids); - let data = gen.to_data_chunk(¤t_timestamp())?; - let gen_id = self.upload_chunk(data).await?; - info!("uploaded generation {}", gen_id); - Ok(gen_id) - } - - async fn read_file(&self, filename: &Path, size: usize) -> Result, ClientError> { - info!("upload file {}", filename.display()); - let file = std::fs::File::open(filename) - .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?; - let chunker = Chunker::new(size, file, filename); - let chunk_ids = self.upload_new_file_chunks(chunker).await?; - Ok(chunk_ids) - } - pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result, ClientError> { self.chunk_client.has_chunk(meta).await } @@ -141,26 +92,6 @@ impl AsyncBackupClient { self.chunk_client.upload_chunk(chunk).await } - pub async fn upload_new_file_chunks( - &self, - chunker: Chunker, - ) -> Result, ClientError> { - let mut chunk_ids = vec![]; - for item in chunker { - let chunk = item?; - if let Some(chunk_id) = self.has_chunk(chunk.meta()).await? { - chunk_ids.push(chunk_id.clone()); - info!("reusing existing chunk {}", chunk_id); - } else { - let chunk_id = self.upload_chunk(chunk).await?; - chunk_ids.push(chunk_id.clone()); - info!("created new chunk {}", chunk_id); - } - } - - Ok(chunk_ids) - } - pub async fn list_generations(&self) -> Result { self.chunk_client.list_generations().await } @@ -347,8 +278,3 @@ impl AsyncChunkClient { Ok(meta) } } - -fn current_timestamp() -> String { - let now: DateTime = Local::now(); - format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z")) -} diff --git a/src/cmd/backup.rs b/src/cmd/backup.rs index dc03ddf..8f3d6d5 100644 --- a/src/cmd/backup.rs +++ b/src/cmd/backup.rs @@ -1,19 +1,15 @@ -use crate::backup_progress::BackupProgress; use crate::backup_run::BackupRun; -use crate::chunkid::ChunkId; use crate::client::AsyncBackupClient; use crate::config::ClientConfig; use crate::error::ObnamError; -use bytesize::MIB; +use crate::generation::GenId; + use log::info; -use std::path::Path; use std::time::SystemTime; use structopt::StructOpt; use tempfile::NamedTempFile; use tokio::runtime::Runtime; -const SQLITE_CHUNK_SIZE: usize = MIB as usize; - #[derive(Debug, StructOpt)] pub struct Backup {} @@ -47,8 +43,6 @@ impl Backup { } }; - let gen_id = upload_nascent_generation(&client, newtemp.path()).await?; - for w in outcome.warnings.iter() { println!("warning: {}", w); } @@ -64,7 +58,7 @@ impl Backup { report_stats( &runtime, outcome.files_count, - &gen_id, + &outcome.gen_id, outcome.warnings.len(), )?; @@ -79,7 +73,7 @@ impl Backup { fn report_stats( runtime: &SystemTime, file_count: i64, - gen_id: &ChunkId, + gen_id: &GenId, num_warnings: usize, ) -> Result<(), ObnamError> { println!("status: OK"); @@ -89,15 +83,3 @@ fn report_stats( println!("generation-id: {}", gen_id); Ok(()) } - -async fn upload_nascent_generation( - client: &AsyncBackupClient, - filename: &Path, -) -> Result { - let progress = BackupProgress::upload_generation(); - let gen_id = client - .upload_generation(filename, SQLITE_CHUNK_SIZE) - .await?; - progress.finish(); - Ok(gen_id) -} -- cgit v1.2.1