summaryrefslogtreecommitdiff
path: root/src/backup_run.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/backup_run.rs')
-rw-r--r--src/backup_run.rs109
1 files changed, 103 insertions, 6 deletions
diff --git a/src/backup_run.rs b/src/backup_run.rs
index 02d20ee..1bb0bc0 100644
--- a/src/backup_run.rs
+++ b/src/backup_run.rs
@@ -1,18 +1,25 @@
use crate::backup_progress::BackupProgress;
use crate::backup_reason::Reason;
+use crate::chunk::{GenerationChunk, GenerationChunkError};
+use crate::chunker::{Chunker, ChunkerError};
use crate::chunkid::ChunkId;
use crate::client::{AsyncBackupClient, ClientError};
use crate::config::ClientConfig;
use crate::error::ObnamError;
-use crate::fsentry::FilesystemEntry;
+use crate::fsentry::{FilesystemEntry, FilesystemKind};
use crate::fsiter::{AnnotatedFsEntry, FsIterError, FsIterator};
use crate::generation::{
GenId, LocalGeneration, LocalGenerationError, NascentError, NascentGeneration,
};
use crate::policy::BackupPolicy;
-use log::{debug, info, warn};
+
+use bytesize::MIB;
+use chrono::{DateTime, Local};
+use log::{debug, error, info, warn};
use std::path::{Path, PathBuf};
+const SQLITE_CHUNK_SIZE: usize = MIB as usize;
+
pub struct BackupRun<'a> {
client: &'a AsyncBackupClient,
policy: BackupPolicy,
@@ -33,6 +40,12 @@ pub enum BackupError {
#[error(transparent)]
LocalGenerationError(#[from] LocalGenerationError),
+
+ #[error(transparent)]
+ ChunkerError(#[from] ChunkerError),
+
+ #[error(transparent)]
+ GenerationChunkError(#[from] GenerationChunkError),
}
#[derive(Debug)]
@@ -44,6 +57,13 @@ pub struct FsEntryBackupOutcome {
}
#[derive(Debug)]
+struct OneRootBackupOutcome {
+ pub files_count: i64,
+ pub warnings: Vec<BackupError>,
+ pub new_cachedir_tags: Vec<PathBuf>,
+}
+
+#[derive(Debug)]
pub struct RootsBackupOutcome {
/// The number of backed up files.
pub files_count: i64,
@@ -51,6 +71,8 @@ pub struct RootsBackupOutcome {
pub warnings: Vec<BackupError>,
/// CACHEDIR.TAG files that aren't present in in a previous generation.
pub new_cachedir_tags: Vec<PathBuf>,
+ /// Id of new generation.
+ pub gen_id: GenId,
}
impl<'a> BackupRun<'a> {
@@ -125,7 +147,7 @@ impl<'a> BackupRun<'a> {
config: &ClientConfig,
old: &LocalGeneration,
newpath: &Path,
- ) -> Result<RootsBackupOutcome, NascentError> {
+ ) -> Result<RootsBackupOutcome, ObnamError> {
let mut warnings: Vec<BackupError> = vec![];
let mut new_cachedir_tags = vec![];
let files_count = {
@@ -152,10 +174,13 @@ impl<'a> BackupRun<'a> {
new.file_count()
};
self.finish();
+ let gen_id = self.upload_nascent_generation(newpath).await?;
+ let gen_id = GenId::from_chunk_id(gen_id);
Ok(RootsBackupOutcome {
files_count,
warnings,
new_cachedir_tags,
+ gen_id,
})
}
@@ -165,7 +190,7 @@ impl<'a> BackupRun<'a> {
old: &LocalGeneration,
new: &mut NascentGeneration,
root: &Path,
- ) -> Result<RootsBackupOutcome, NascentError> {
+ ) -> Result<OneRootBackupOutcome, NascentError> {
let mut warnings: Vec<BackupError> = vec![];
let mut new_cachedir_tags = vec![];
let iter = FsIterator::new(root, config.exclude_cache_tag_directories);
@@ -195,7 +220,7 @@ impl<'a> BackupRun<'a> {
}
}
- Ok(RootsBackupOutcome {
+ Ok(OneRootBackupOutcome {
files_count: 0, // Caller will get file count from new.
warnings,
new_cachedir_tags,
@@ -243,7 +268,6 @@ impl<'a> BackupRun<'a> {
reason: Reason,
) -> FsEntryBackupOutcome {
let ids = self
- .client
.upload_filesystem_entry(&entry.inner, self.buffer_size)
.await;
match ids {
@@ -265,6 +289,74 @@ impl<'a> BackupRun<'a> {
}
}
+ pub async fn upload_filesystem_entry(
+ &self,
+ e: &FilesystemEntry,
+ size: usize,
+ ) -> Result<Vec<ChunkId>, BackupError> {
+ let path = e.pathbuf();
+ info!("uploading {:?}", path);
+ let ids = match e.kind() {
+ FilesystemKind::Regular => self.read_file(&path, size).await?,
+ FilesystemKind::Directory => vec![],
+ FilesystemKind::Symlink => vec![],
+ FilesystemKind::Socket => vec![],
+ FilesystemKind::Fifo => vec![],
+ };
+ info!("upload OK for {:?}", path);
+ Ok(ids)
+ }
+
+ pub async fn upload_generation(
+ &self,
+ filename: &Path,
+ size: usize,
+ ) -> Result<ChunkId, BackupError> {
+ info!("upload SQLite {}", filename.display());
+ let ids = self.read_file(filename, size).await?;
+ let gen = GenerationChunk::new(ids);
+ let data = gen.to_data_chunk(&current_timestamp())?;
+ let gen_id = self.client.upload_chunk(data).await?;
+ info!("uploaded generation {}", gen_id);
+ Ok(gen_id)
+ }
+
+ async fn read_file(&self, filename: &Path, size: usize) -> Result<Vec<ChunkId>, BackupError> {
+ info!("upload file {}", filename.display());
+ let file = std::fs::File::open(filename)
+ .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?;
+ let chunker = Chunker::new(size, file, filename);
+ let chunk_ids = self.upload_new_file_chunks(chunker).await?;
+ Ok(chunk_ids)
+ }
+
+ pub async fn upload_new_file_chunks(
+ &self,
+ chunker: Chunker,
+ ) -> Result<Vec<ChunkId>, BackupError> {
+ let mut chunk_ids = vec![];
+ for item in chunker {
+ let chunk = item?;
+ if let Some(chunk_id) = self.client.has_chunk(chunk.meta()).await? {
+ chunk_ids.push(chunk_id.clone());
+ info!("reusing existing chunk {}", chunk_id);
+ } else {
+ let chunk_id = self.client.upload_chunk(chunk).await?;
+ chunk_ids.push(chunk_id.clone());
+ info!("created new chunk {}", chunk_id);
+ }
+ }
+
+ Ok(chunk_ids)
+ }
+
+ async fn upload_nascent_generation(&self, filename: &Path) -> Result<ChunkId, ObnamError> {
+ let progress = BackupProgress::upload_generation();
+ let gen_id = self.upload_generation(filename, SQLITE_CHUNK_SIZE).await?;
+ progress.finish();
+ Ok(gen_id)
+ }
+
fn found_live_file(&self, path: &Path) {
if let Some(progress) = &self.progress {
progress.found_live_file(path);
@@ -277,3 +369,8 @@ impl<'a> BackupRun<'a> {
}
}
}
+
+fn current_timestamp() -> String {
+ let now: DateTime<Local> = Local::now();
+ format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z"))
+}