diff options
Diffstat (limited to 'src/backup_run.rs')
-rw-r--r-- | src/backup_run.rs | 109 |
1 files changed, 103 insertions, 6 deletions
diff --git a/src/backup_run.rs b/src/backup_run.rs index 02d20ee..1bb0bc0 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -1,18 +1,25 @@ use crate::backup_progress::BackupProgress; use crate::backup_reason::Reason; +use crate::chunk::{GenerationChunk, GenerationChunkError}; +use crate::chunker::{Chunker, ChunkerError}; use crate::chunkid::ChunkId; use crate::client::{AsyncBackupClient, ClientError}; use crate::config::ClientConfig; use crate::error::ObnamError; -use crate::fsentry::FilesystemEntry; +use crate::fsentry::{FilesystemEntry, FilesystemKind}; use crate::fsiter::{AnnotatedFsEntry, FsIterError, FsIterator}; use crate::generation::{ GenId, LocalGeneration, LocalGenerationError, NascentError, NascentGeneration, }; use crate::policy::BackupPolicy; -use log::{debug, info, warn}; + +use bytesize::MIB; +use chrono::{DateTime, Local}; +use log::{debug, error, info, warn}; use std::path::{Path, PathBuf}; +const SQLITE_CHUNK_SIZE: usize = MIB as usize; + pub struct BackupRun<'a> { client: &'a AsyncBackupClient, policy: BackupPolicy, @@ -33,6 +40,12 @@ pub enum BackupError { #[error(transparent)] LocalGenerationError(#[from] LocalGenerationError), + + #[error(transparent)] + ChunkerError(#[from] ChunkerError), + + #[error(transparent)] + GenerationChunkError(#[from] GenerationChunkError), } #[derive(Debug)] @@ -44,6 +57,13 @@ pub struct FsEntryBackupOutcome { } #[derive(Debug)] +struct OneRootBackupOutcome { + pub files_count: i64, + pub warnings: Vec<BackupError>, + pub new_cachedir_tags: Vec<PathBuf>, +} + +#[derive(Debug)] pub struct RootsBackupOutcome { /// The number of backed up files. pub files_count: i64, @@ -51,6 +71,8 @@ pub struct RootsBackupOutcome { pub warnings: Vec<BackupError>, /// CACHEDIR.TAG files that aren't present in in a previous generation. pub new_cachedir_tags: Vec<PathBuf>, + /// Id of new generation. + pub gen_id: GenId, } impl<'a> BackupRun<'a> { @@ -125,7 +147,7 @@ impl<'a> BackupRun<'a> { config: &ClientConfig, old: &LocalGeneration, newpath: &Path, - ) -> Result<RootsBackupOutcome, NascentError> { + ) -> Result<RootsBackupOutcome, ObnamError> { let mut warnings: Vec<BackupError> = vec![]; let mut new_cachedir_tags = vec![]; let files_count = { @@ -152,10 +174,13 @@ impl<'a> BackupRun<'a> { new.file_count() }; self.finish(); + let gen_id = self.upload_nascent_generation(newpath).await?; + let gen_id = GenId::from_chunk_id(gen_id); Ok(RootsBackupOutcome { files_count, warnings, new_cachedir_tags, + gen_id, }) } @@ -165,7 +190,7 @@ impl<'a> BackupRun<'a> { old: &LocalGeneration, new: &mut NascentGeneration, root: &Path, - ) -> Result<RootsBackupOutcome, NascentError> { + ) -> Result<OneRootBackupOutcome, NascentError> { let mut warnings: Vec<BackupError> = vec![]; let mut new_cachedir_tags = vec![]; let iter = FsIterator::new(root, config.exclude_cache_tag_directories); @@ -195,7 +220,7 @@ impl<'a> BackupRun<'a> { } } - Ok(RootsBackupOutcome { + Ok(OneRootBackupOutcome { files_count: 0, // Caller will get file count from new. warnings, new_cachedir_tags, @@ -243,7 +268,6 @@ impl<'a> BackupRun<'a> { reason: Reason, ) -> FsEntryBackupOutcome { let ids = self - .client .upload_filesystem_entry(&entry.inner, self.buffer_size) .await; match ids { @@ -265,6 +289,74 @@ impl<'a> BackupRun<'a> { } } + pub async fn upload_filesystem_entry( + &self, + e: &FilesystemEntry, + size: usize, + ) -> Result<Vec<ChunkId>, BackupError> { + let path = e.pathbuf(); + info!("uploading {:?}", path); + let ids = match e.kind() { + FilesystemKind::Regular => self.read_file(&path, size).await?, + FilesystemKind::Directory => vec![], + FilesystemKind::Symlink => vec![], + FilesystemKind::Socket => vec![], + FilesystemKind::Fifo => vec![], + }; + info!("upload OK for {:?}", path); + Ok(ids) + } + + pub async fn upload_generation( + &self, + filename: &Path, + size: usize, + ) -> Result<ChunkId, BackupError> { + info!("upload SQLite {}", filename.display()); + let ids = self.read_file(filename, size).await?; + let gen = GenerationChunk::new(ids); + let data = gen.to_data_chunk(¤t_timestamp())?; + let gen_id = self.client.upload_chunk(data).await?; + info!("uploaded generation {}", gen_id); + Ok(gen_id) + } + + async fn read_file(&self, filename: &Path, size: usize) -> Result<Vec<ChunkId>, BackupError> { + info!("upload file {}", filename.display()); + let file = std::fs::File::open(filename) + .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?; + let chunker = Chunker::new(size, file, filename); + let chunk_ids = self.upload_new_file_chunks(chunker).await?; + Ok(chunk_ids) + } + + pub async fn upload_new_file_chunks( + &self, + chunker: Chunker, + ) -> Result<Vec<ChunkId>, BackupError> { + let mut chunk_ids = vec![]; + for item in chunker { + let chunk = item?; + if let Some(chunk_id) = self.client.has_chunk(chunk.meta()).await? { + chunk_ids.push(chunk_id.clone()); + info!("reusing existing chunk {}", chunk_id); + } else { + let chunk_id = self.client.upload_chunk(chunk).await?; + chunk_ids.push(chunk_id.clone()); + info!("created new chunk {}", chunk_id); + } + } + + Ok(chunk_ids) + } + + async fn upload_nascent_generation(&self, filename: &Path) -> Result<ChunkId, ObnamError> { + let progress = BackupProgress::upload_generation(); + let gen_id = self.upload_generation(filename, SQLITE_CHUNK_SIZE).await?; + progress.finish(); + Ok(gen_id) + } + fn found_live_file(&self, path: &Path) { if let Some(progress) = &self.progress { progress.found_live_file(path); @@ -277,3 +369,8 @@ impl<'a> BackupRun<'a> { } } } + +fn current_timestamp() -> String { + let now: DateTime<Local> = Local::now(); + format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z")) +} |