diff options
Diffstat (limited to 'src')
55 files changed, 5942 insertions, 1393 deletions
diff --git a/src/accumulated_time.rs b/src/accumulated_time.rs new file mode 100644 index 0000000..cdf34b2 --- /dev/null +++ b/src/accumulated_time.rs @@ -0,0 +1,79 @@ +//! Measure accumulated time for various operations. + +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Mutex; +use std::time::Instant; + +/// Accumulated times for different clocks. +/// +/// The caller defines a clock type, usually an enum. +/// `AccumulatedTime` accumulates time for each possible clock. +/// Conceptually, every type of clock exists. If a type of clock +/// doesn't ever get created, it measures at 0 accumulated time. +#[derive(Debug)] +pub struct AccumulatedTime<T> { + accumulated: Mutex<HashMap<T, ClockTime>>, +} + +#[derive(Debug, Default)] +struct ClockTime { + nanos: u128, + started: Option<Instant>, +} + +impl<T: Eq + PartialEq + Hash + Copy> AccumulatedTime<T> { + /// Create a new accumulated time collector. + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + Self { + accumulated: Mutex::new(HashMap::new()), + } + } + + /// Start a new clock of a given type to measure a span of time. + /// + /// The clock's measured time is added to the accumulator when the + /// clock is stopped. + pub fn start(&mut self, clock: T) { + let mut map = self.accumulated.lock().unwrap(); + let ct = map.entry(clock).or_default(); + assert!(ct.started.is_none()); + ct.started = Some(Instant::now()); + } + + /// Stop a running clock. + /// + /// Its run time is added to the accumulated time for that kind of clock. + pub fn stop(&mut self, clock: T) { + let mut map = self.accumulated.lock().unwrap(); + if let Some(ct) = map.get_mut(&clock) { + assert!(ct.started.is_some()); + if let Some(started) = ct.started.take() { + ct.nanos += started.elapsed().as_nanos(); + ct.started = None; + } + } + } + + /// Return the accumulated time for a type of clock, as whole seconds. + pub fn secs(&self, clock: T) -> u128 { + self.nanos(clock) / 1_000_000_000u128 + } + + /// Return the accumulated time for a type of clock, as nanoseconds. + /// + /// This includes the time spent in a currently running clock. + pub fn nanos(&self, clock: T) -> u128 { + let map = self.accumulated.lock().unwrap(); + if let Some(ct) = map.get(&clock) { + if let Some(started) = ct.started { + ct.nanos + started.elapsed().as_nanos() + } else { + ct.nanos + } + } else { + 0 + } + } +} diff --git a/src/backup_progress.rs b/src/backup_progress.rs index 6c1d3e6..e3995f0 100644 --- a/src/backup_progress.rs +++ b/src/backup_progress.rs @@ -1,44 +1,133 @@ +//! Progress bars for Obnam. + +use crate::generation::GenId; use indicatif::{ProgressBar, ProgressStyle}; -use std::path::Path; +use std::{path::Path, time::Duration}; + +const SHOW_PROGRESS: bool = true; +/// A progress bar abstraction specific to backups. +/// +/// The progress bar is different for initial and incremental backups, +/// and for different phases of making a backup. pub struct BackupProgress { progress: ProgressBar, } impl BackupProgress { - pub fn new() -> Self { - let progress = if true { + /// Create a progress bar for an initial backup. + pub fn initial() -> Self { + let progress = if SHOW_PROGRESS { ProgressBar::new(0) } else { ProgressBar::hidden() }; - let parts = vec![ + let parts = [ + "initial backup", + "elapsed: {elapsed}", + "files: {pos}", + "current: {wide_msg}", + "{spinner}", + ]; + progress.set_style( + ProgressStyle::default_bar() + .template(&parts.join("\n")) + .expect("create indicatif ProgressStyle value"), + ); + progress.enable_steady_tick(Duration::from_millis(100)); + + Self { progress } + } + + /// Create a progress bar for an incremental backup. + pub fn incremental() -> Self { + let progress = if SHOW_PROGRESS { + ProgressBar::new(0) + } else { + ProgressBar::hidden() + }; + let parts = [ + "incremental backup", "{wide_bar}", "elapsed: {elapsed}", "files: {pos}/{len}", "current: {wide_msg}", "{spinner}", ]; - progress.set_style(ProgressStyle::default_bar().template(&parts.join("\n"))); - progress.enable_steady_tick(100); + progress.set_style( + ProgressStyle::default_bar() + .template(&parts.join("\n")) + .expect("create indicatif ProgressStyle value"), + ); + progress.enable_steady_tick(Duration::from_millis(100)); + + Self { progress } + } + + /// Create a progress bar for uploading a new generation's metadata. + pub fn upload_generation() -> Self { + let progress = ProgressBar::new(0); + let parts = [ + "uploading new generation metadata", + "elapsed: {elapsed}", + "{spinner}", + ]; + progress.set_style( + ProgressStyle::default_bar() + .template(&parts.join("\n")) + .expect("create indicatif ProgressStyle value"), + ); + progress.enable_steady_tick(Duration::from_millis(100)); + + Self { progress } + } + + /// Create a progress bar for downloading an existing generation's + /// metadata. + pub fn download_generation(gen_id: &GenId) -> Self { + let progress = ProgressBar::new(0); + let parts = ["{msg}", "elapsed: {elapsed}", "{spinner}"]; + progress.set_style( + ProgressStyle::default_bar() + .template(&parts.join("\n")) + .expect("create indicatif ProgressStyle value"), + ); + progress.enable_steady_tick(Duration::from_millis(100)); + progress.set_message(format!( + "downloading previous generation metadata: {}", + gen_id + )); Self { progress } } + /// Set the number of files that were in the previous generation. + /// + /// The new generation usually has about the same number of files, + /// so the progress bar can show progress for incremental backups + /// without having to count all the files that actually exist first. pub fn files_in_previous_generation(&self, count: u64) { self.progress.set_length(count); } + /// Update progress bar about number of problems found during a backup. pub fn found_problem(&self) { self.progress.inc(1); } + /// Update progress bar about number of actual files found. pub fn found_live_file(&self, filename: &Path) { self.progress.inc(1); - self.progress - .set_message(&format!("{}", filename.display())); + if self.progress.length() < Some(self.progress.position()) { + self.progress.set_length(self.progress.position()); + } + self.progress.set_message(format!("{}", filename.display())); } + /// Tell progress bar it's finished. + /// + /// This will remove all traces of the progress bar from the + /// screen. pub fn finish(&self) { self.progress.set_length(self.progress.position()); self.progress.finish_and_clear(); diff --git a/src/backup_reason.rs b/src/backup_reason.rs index 218857c..9a17d80 100644 --- a/src/backup_reason.rs +++ b/src/backup_reason.rs @@ -1,29 +1,54 @@ +//! Why was a file backed up? + use rusqlite::types::ToSqlOutput; use rusqlite::ToSql; use std::fmt; +/// Represent the reason a file is in a backup. #[derive(Debug, Copy, Clone)] pub enum Reason { + /// File was skipped due to policy, but carried over without + /// changes. Skipped, + /// File is new, compared to previous backup. IsNew, + /// File has been changed, compared to previous backup, Changed, + /// File has not been changed, compared to previous backup, Unchanged, - Error, + /// There was an error looking up the file in the previous backup. + /// + /// File has been carried over without changes. + GenerationLookupError, + /// The was an error backing up the file. + /// + /// File has been carried over without changes. + FileError, + /// Reason is unknown. + /// + /// The previous backup had a reason that the current version of + /// Obnam doesn't recognize. The file has been carried over + /// without changes. + Unknown, } impl Reason { - pub fn from_str(text: &str) -> Reason { + /// Create a Reason from a string representation. + pub fn from(text: &str) -> Reason { match text { "skipped" => Reason::Skipped, "new" => Reason::IsNew, "changed" => Reason::Changed, "unchanged" => Reason::Unchanged, - _ => Reason::Error, + "genlookuperror" => Reason::GenerationLookupError, + "fileerror" => Reason::FileError, + _ => Reason::Unknown, } } } impl ToSql for Reason { + /// Represent Reason as text for SQL. fn to_sql(&self) -> rusqlite::Result<ToSqlOutput> { Ok(ToSqlOutput::Owned(rusqlite::types::Value::Text(format!( "{}", @@ -33,13 +58,16 @@ impl ToSql for Reason { } impl fmt::Display for Reason { + /// Represent Reason for display. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let reason = match self { Reason::Skipped => "skipped", Reason::IsNew => "new", Reason::Changed => "changed", Reason::Unchanged => "unchanged", - Reason::Error => "error", + Reason::GenerationLookupError => "genlookuperror", + Reason::FileError => "fileerror", + Reason::Unknown => "unknown", }; write!(f, "{}", reason) } diff --git a/src/backup_run.rs b/src/backup_run.rs index e3bfc5a..372ef65 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -1,92 +1,437 @@ +//! Run one backup. + use crate::backup_progress::BackupProgress; use crate::backup_reason::Reason; +use crate::chunk::{GenerationChunk, GenerationChunkError}; +use crate::chunker::{ChunkerError, FileChunks}; use crate::chunkid::ChunkId; -use crate::client::{BackupClient, ClientConfig}; -use crate::fsentry::FilesystemEntry; -use crate::generation::LocalGeneration; +use crate::client::{BackupClient, ClientError}; +use crate::config::ClientConfig; +use crate::db::DatabaseError; +use crate::dbgen::{schema_version, FileId, DEFAULT_SCHEMA_MAJOR}; +use crate::error::ObnamError; +use crate::fsentry::{FilesystemEntry, FilesystemKind}; +use crate::fsiter::{AnnotatedFsEntry, FsIterError, FsIterator}; +use crate::generation::{ + GenId, LocalGeneration, LocalGenerationError, NascentError, NascentGeneration, +}; +use crate::label::LabelChecksumKind; +use crate::performance::{Clock, Performance}; use crate::policy::BackupPolicy; -use log::{info, warn}; +use crate::schema::SchemaVersion; + +use bytesize::MIB; +use chrono::{DateTime, Local}; +use log::{debug, error, info, warn}; +use std::path::{Path, PathBuf}; + +const DEFAULT_CHECKSUM_KIND: LabelChecksumKind = LabelChecksumKind::Sha256; +const SQLITE_CHUNK_SIZE: usize = MIB as usize; -pub struct BackupRun { - client: BackupClient, +/// A running backup. +pub struct BackupRun<'a> { + checksum_kind: Option<LabelChecksumKind>, + client: &'a mut BackupClient, policy: BackupPolicy, buffer_size: usize, - progress: BackupProgress, + progress: Option<BackupProgress>, +} + +/// Possible errors that can occur during a backup. +#[derive(Debug, thiserror::Error)] +pub enum BackupError { + /// An error from communicating with the server. + #[error(transparent)] + ClientError(#[from] ClientError), + + /// An error iterating over a directory tree. + #[error(transparent)] + FsIterError(#[from] FsIterError), + + /// An error from creating a new backup's metadata. + #[error(transparent)] + NascentError(#[from] NascentError), + + /// An error using an existing backup's metadata. + #[error(transparent)] + LocalGenerationError(#[from] LocalGenerationError), + + /// An error using a Database. + #[error(transparent)] + Database(#[from] DatabaseError), + + /// An error splitting data into chunks. + #[error(transparent)] + ChunkerError(#[from] ChunkerError), + + /// A error splitting backup metadata into chunks. + #[error(transparent)] + GenerationChunkError(#[from] GenerationChunkError), +} + +/// The outcome of backing up a file system entry. +#[derive(Debug)] +pub struct FsEntryBackupOutcome { + /// The file system entry. + pub entry: FilesystemEntry, + /// The chunk identifiers for the file's content. + pub ids: Vec<ChunkId>, + /// Why this entry is added to the new backup. + pub reason: Reason, + /// Does this entry represent a cache directory? + pub is_cachedir_tag: bool, +} + +/// The outcome of backing up a backup root. +#[derive(Debug)] +struct OneRootBackupOutcome { + /// Any warnings (non-fatal errors) from backing up the backup root. + pub warnings: Vec<BackupError>, + /// New cache directories in this root. + pub new_cachedir_tags: Vec<PathBuf>, } -impl BackupRun { - pub fn new(config: &ClientConfig, buffer_size: usize) -> anyhow::Result<Self> { - let client = BackupClient::new(&config.server_url)?; - let policy = BackupPolicy::new(); - let progress = BackupProgress::new(); +/// The outcome of a backup run. +#[derive(Debug)] +pub struct RootsBackupOutcome { + /// The number of backed up files. + pub files_count: FileId, + /// The errors encountered while backing up files. + pub warnings: Vec<BackupError>, + /// CACHEDIR.TAG files that aren't present in in a previous generation. + pub new_cachedir_tags: Vec<PathBuf>, + /// Id of new generation. + pub gen_id: GenId, +} + +impl<'a> BackupRun<'a> { + /// Create a new run for an initial backup. + pub fn initial( + config: &ClientConfig, + client: &'a mut BackupClient, + ) -> Result<Self, BackupError> { Ok(Self { + checksum_kind: Some(DEFAULT_CHECKSUM_KIND), client, - policy, - buffer_size, - progress, + policy: BackupPolicy::default(), + buffer_size: config.chunk_size, + progress: Some(BackupProgress::initial()), }) } - pub fn client(&self) -> &BackupClient { - &self.client + /// Create a new run for an incremental backup. + pub fn incremental( + config: &ClientConfig, + client: &'a mut BackupClient, + ) -> Result<Self, BackupError> { + Ok(Self { + checksum_kind: None, + client, + policy: BackupPolicy::default(), + buffer_size: config.chunk_size, + progress: None, + }) } - pub fn progress(&self) -> &BackupProgress { - &self.progress - } + /// Start the backup run. + pub async fn start( + &mut self, + genid: Option<&GenId>, + oldname: &Path, + perf: &mut Performance, + ) -> Result<LocalGeneration, ObnamError> { + match genid { + None => { + // Create a new, empty generation. + let schema = schema_version(DEFAULT_SCHEMA_MAJOR).unwrap(); + NascentGeneration::create(oldname, schema, self.checksum_kind.unwrap())?.close()?; - pub fn backup_file_initially( - &self, - entry: anyhow::Result<FilesystemEntry>, - ) -> anyhow::Result<(FilesystemEntry, Vec<ChunkId>, Reason)> { - match entry { - Err(err) => Err(err.into()), - Ok(entry) => { - let path = &entry.pathbuf(); - info!("backup: {}", path.display()); - self.progress.found_live_file(path); - let ids = self - .client - .upload_filesystem_entry(&entry, self.buffer_size)?; - Ok((entry.clone(), ids, Reason::IsNew)) + // Open the newly created empty generation. + Ok(LocalGeneration::open(oldname)?) + } + Some(genid) => { + perf.start(Clock::GenerationDownload); + let old = self.fetch_previous_generation(genid, oldname).await?; + perf.stop(Clock::GenerationDownload); + + let meta = old.meta()?; + if let Some(v) = meta.get("checksum_kind") { + self.checksum_kind = Some(LabelChecksumKind::from(v)?); + } + + let progress = BackupProgress::incremental(); + progress.files_in_previous_generation(old.file_count()? as u64); + self.progress = Some(progress); + + Ok(old) } } } - pub fn backup_file_incrementally( + fn checksum_kind(&self) -> LabelChecksumKind { + self.checksum_kind.unwrap_or(LabelChecksumKind::Sha256) + } + + async fn fetch_previous_generation( &self, - entry: anyhow::Result<FilesystemEntry>, + genid: &GenId, + oldname: &Path, + ) -> Result<LocalGeneration, ObnamError> { + let progress = BackupProgress::download_generation(genid); + let old = self.client.fetch_generation(genid, oldname).await?; + progress.finish(); + Ok(old) + } + + /// Finish this backup run. + pub fn finish(&self) { + if let Some(progress) = &self.progress { + progress.finish(); + } + } + + /// Back up all the roots for this run. + pub async fn backup_roots( + &mut self, + config: &ClientConfig, old: &LocalGeneration, - ) -> anyhow::Result<(FilesystemEntry, Vec<ChunkId>, Reason)> { - match entry { - Err(err) => { - warn!("backup: {}", err); - self.progress.found_problem(); - Err(err) + newpath: &Path, + schema: SchemaVersion, + perf: &mut Performance, + ) -> Result<RootsBackupOutcome, ObnamError> { + let mut warnings: Vec<BackupError> = vec![]; + let mut new_cachedir_tags = vec![]; + let files_count = { + let mut new = NascentGeneration::create(newpath, schema, self.checksum_kind.unwrap())?; + for root in &config.roots { + match self.backup_one_root(config, old, &mut new, root).await { + Ok(mut o) => { + new_cachedir_tags.append(&mut o.new_cachedir_tags); + if !o.warnings.is_empty() { + for err in o.warnings.iter() { + debug!("ignoring backup error {}", err); + self.found_problem(); + } + warnings.append(&mut o.warnings); + } + } + Err(err) => { + self.found_problem(); + return Err(err.into()); + } + } } - Ok(entry) => { - let path = &entry.pathbuf(); - info!("backup: {}", path.display()); - self.progress.found_live_file(path); - let reason = self.policy.needs_backup(&old, &entry); - match reason { - Reason::IsNew | Reason::Changed | Reason::Error => { - let ids = self - .client - .upload_filesystem_entry(&entry, self.buffer_size)?; - Ok((entry.clone(), ids, reason)) + let count = new.file_count(); + new.close()?; + count + }; + self.finish(); + perf.start(Clock::GenerationUpload); + let gen_id = self.upload_nascent_generation(newpath).await?; + perf.stop(Clock::GenerationUpload); + let gen_id = GenId::from_chunk_id(gen_id); + Ok(RootsBackupOutcome { + files_count, + warnings, + new_cachedir_tags, + gen_id, + }) + } + + async fn backup_one_root( + &mut self, + config: &ClientConfig, + old: &LocalGeneration, + new: &mut NascentGeneration, + root: &Path, + ) -> Result<OneRootBackupOutcome, NascentError> { + let mut warnings: Vec<BackupError> = vec![]; + let mut new_cachedir_tags = vec![]; + let iter = FsIterator::new(root, config.exclude_cache_tag_directories); + let mut first_entry = true; + for entry in iter { + match entry { + Err(err) => { + if first_entry { + // Only the first entry (the backup root) + // failing is an error. Everything else is a + // warning. + return Err(NascentError::BackupRootFailed(root.to_path_buf(), err)); + } + warnings.push(err.into()); + } + Ok(entry) => { + let path = entry.inner.pathbuf(); + if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? { + new_cachedir_tags.push(path); } - Reason::Unchanged | Reason::Skipped => { - let fileno = old.get_fileno(&entry.pathbuf())?; - let ids = if let Some(fileno) = fileno { - old.chunkids(fileno)? - } else { - vec![] - }; - Ok((entry.clone(), ids, reason)) + match self.backup_if_needed(entry, old).await { + Err(err) => { + warnings.push(err); + } + Ok(None) => (), + Ok(Some(o)) => { + if let Err(err) = + new.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag) + { + warnings.push(err.into()); + } + } } } } + first_entry = false; + } + + Ok(OneRootBackupOutcome { + warnings, + new_cachedir_tags, + }) + } + + async fn backup_if_needed( + &mut self, + entry: AnnotatedFsEntry, + old: &LocalGeneration, + ) -> Result<Option<FsEntryBackupOutcome>, BackupError> { + let path = &entry.inner.pathbuf(); + info!("backup: {}", path.display()); + self.found_live_file(path); + let reason = self.policy.needs_backup(old, &entry.inner); + match reason { + Reason::IsNew | Reason::Changed | Reason::GenerationLookupError | Reason::Unknown => { + Ok(Some(self.backup_one_entry(&entry, path, reason).await)) + } + Reason::Skipped => Ok(None), + Reason::Unchanged | Reason::FileError => { + let fileno = old.get_fileno(&entry.inner.pathbuf())?; + let ids = if let Some(fileno) = fileno { + let mut ids = vec![]; + for id in old.chunkids(fileno)?.iter()? { + ids.push(id?); + } + ids + } else { + vec![] + }; + Ok(Some(FsEntryBackupOutcome { + entry: entry.inner, + ids, + reason, + is_cachedir_tag: entry.is_cachedir_tag, + })) + } + } + } + + async fn backup_one_entry( + &mut self, + entry: &AnnotatedFsEntry, + path: &Path, + reason: Reason, + ) -> FsEntryBackupOutcome { + let ids = self + .upload_filesystem_entry(&entry.inner, self.buffer_size) + .await; + match ids { + Err(err) => { + warn!("error backing up {}, skipping it: {}", path.display(), err); + FsEntryBackupOutcome { + entry: entry.inner.clone(), + ids: vec![], + reason: Reason::FileError, + is_cachedir_tag: entry.is_cachedir_tag, + } + } + Ok(ids) => FsEntryBackupOutcome { + entry: entry.inner.clone(), + ids, + reason, + is_cachedir_tag: entry.is_cachedir_tag, + }, + } + } + + /// Upload any file content for a file system entry. + pub async fn upload_filesystem_entry( + &mut self, + e: &FilesystemEntry, + size: usize, + ) -> Result<Vec<ChunkId>, BackupError> { + let path = e.pathbuf(); + info!("uploading {:?}", path); + let ids = match e.kind() { + FilesystemKind::Regular => self.upload_regular_file(&path, size).await?, + FilesystemKind::Directory => vec![], + FilesystemKind::Symlink => vec![], + FilesystemKind::Socket => vec![], + FilesystemKind::Fifo => vec![], + }; + info!("upload OK for {:?}", path); + Ok(ids) + } + + /// Upload the metadata for the backup of this run. + pub async fn upload_generation( + &mut self, + filename: &Path, + size: usize, + ) -> Result<ChunkId, BackupError> { + info!("upload SQLite {}", filename.display()); + let ids = self.upload_regular_file(filename, size).await?; + let gen = GenerationChunk::new(ids); + let data = gen.to_data_chunk()?; + let gen_id = self.client.upload_chunk(data).await?; + info!("uploaded generation {}", gen_id); + Ok(gen_id) + } + + async fn upload_regular_file( + &mut self, + filename: &Path, + size: usize, + ) -> Result<Vec<ChunkId>, BackupError> { + info!("upload file {}", filename.display()); + let mut chunk_ids = vec![]; + let file = std::fs::File::open(filename) + .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?; + let chunker = FileChunks::new(size, file, filename, self.checksum_kind()); + for item in chunker { + let chunk = item?; + if let Some(chunk_id) = self.client.has_chunk(chunk.meta()).await? { + chunk_ids.push(chunk_id.clone()); + info!("reusing existing chunk {}", chunk_id); + } else { + let chunk_id = self.client.upload_chunk(chunk).await?; + chunk_ids.push(chunk_id.clone()); + info!("created new chunk {}", chunk_id); + } } + Ok(chunk_ids) } + + async fn upload_nascent_generation(&mut self, filename: &Path) -> Result<ChunkId, ObnamError> { + let progress = BackupProgress::upload_generation(); + let gen_id = self.upload_generation(filename, SQLITE_CHUNK_SIZE).await?; + progress.finish(); + Ok(gen_id) + } + + fn found_live_file(&self, path: &Path) { + if let Some(progress) = &self.progress { + progress.found_live_file(path); + } + } + + fn found_problem(&self) { + if let Some(progress) = &self.progress { + progress.found_problem(); + } + } +} + +/// Current timestamp as an ISO 8601 string. +pub fn current_timestamp() -> String { + let now: DateTime<Local> = Local::now(); + format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z")) } diff --git a/src/benchmark.rs b/src/benchmark.rs deleted file mode 100644 index b313868..0000000 --- a/src/benchmark.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::chunk::DataChunk; -use crate::chunkid::ChunkId; -use crate::chunkmeta::ChunkMeta; - -// Generate a desired number of empty data chunks with id and metadata. -pub struct ChunkGenerator { - goal: u32, - next: u32, -} - -impl ChunkGenerator { - pub fn new(goal: u32) -> Self { - Self { goal, next: 0 } - } -} - -impl Iterator for ChunkGenerator { - type Item = (ChunkId, String, ChunkMeta, DataChunk); - - fn next(&mut self) -> Option<Self::Item> { - if self.next >= self.goal { - None - } else { - let id = ChunkId::new(); - let checksum = id.sha256(); - let meta = ChunkMeta::new(&checksum); - let chunk = DataChunk::new(vec![]); - self.next += 1; - Some((id, checksum, meta, chunk)) - } - } -} diff --git a/src/bin/benchmark-index.rs b/src/bin/benchmark-index.rs deleted file mode 100644 index d49a6c3..0000000 --- a/src/bin/benchmark-index.rs +++ /dev/null @@ -1,35 +0,0 @@ -use obnam::benchmark::ChunkGenerator; -use obnam::chunkmeta::ChunkMeta; -use obnam::index::Index; -use std::path::PathBuf; -use structopt::StructOpt; - -#[derive(Debug, StructOpt)] -#[structopt( - name = "benchmark-index", - about = "Benhcmark the store index in memory" -)] -struct Opt { - // We don't use this, but we accept it for command line - // compatibility with other benchmark programs. - #[structopt(parse(from_os_str))] - chunks: PathBuf, - - #[structopt()] - num: u32, -} - -fn main() -> anyhow::Result<()> { - pretty_env_logger::init(); - - let opt = Opt::from_args(); - let gen = ChunkGenerator::new(opt.num); - - let mut index = Index::new(".")?; - for (id, checksum, _, _) in gen { - let meta = ChunkMeta::new(&checksum); - index.insert_meta(id, meta)?; - } - - Ok(()) -} diff --git a/src/bin/benchmark-indexedstore.rs b/src/bin/benchmark-indexedstore.rs deleted file mode 100644 index 3ee4c38..0000000 --- a/src/bin/benchmark-indexedstore.rs +++ /dev/null @@ -1,28 +0,0 @@ -use obnam::benchmark::ChunkGenerator; -use obnam::indexedstore::IndexedStore; -use std::path::PathBuf; -use structopt::StructOpt; - -#[derive(Debug, StructOpt)] -#[structopt(name = "benchmark-store", about = "Benhcmark the store without HTTP")] -struct Opt { - #[structopt(parse(from_os_str))] - chunks: PathBuf, - - #[structopt()] - num: u32, -} - -fn main() -> anyhow::Result<()> { - pretty_env_logger::init(); - - let opt = Opt::from_args(); - let gen = ChunkGenerator::new(opt.num); - - let mut store = IndexedStore::new(&opt.chunks)?; - for (_, _, meta, chunk) in gen { - store.save(&meta, &chunk)?; - } - - Ok(()) -} diff --git a/src/bin/benchmark-null.rs b/src/bin/benchmark-null.rs deleted file mode 100644 index 6df8ca1..0000000 --- a/src/bin/benchmark-null.rs +++ /dev/null @@ -1,29 +0,0 @@ -use obnam::benchmark::ChunkGenerator; -use std::path::PathBuf; -use structopt::StructOpt; - -#[derive(Debug, StructOpt)] -#[structopt( - name = "benchmark-index", - about = "Benhcmark the store index in memory" -)] -struct Opt { - // We don't use this, but we accept it for command line - // compatibility with other benchmark programs. - #[structopt(parse(from_os_str))] - chunks: PathBuf, - - #[structopt()] - num: u32, -} - -fn main() -> anyhow::Result<()> { - pretty_env_logger::init(); - - let opt = Opt::from_args(); - let gen = ChunkGenerator::new(opt.num); - - for (_, _, _, _) in gen {} - - Ok(()) -} diff --git a/src/bin/benchmark-store.rs b/src/bin/benchmark-store.rs deleted file mode 100644 index f7c82b1..0000000 --- a/src/bin/benchmark-store.rs +++ /dev/null @@ -1,28 +0,0 @@ -use obnam::benchmark::ChunkGenerator; -use obnam::store::Store; -use std::path::PathBuf; -use structopt::StructOpt; - -#[derive(Debug, StructOpt)] -#[structopt(name = "benchmark-store", about = "Benhcmark the store without HTTP")] -struct Opt { - #[structopt(parse(from_os_str))] - chunks: PathBuf, - - #[structopt()] - num: u32, -} - -fn main() -> anyhow::Result<()> { - pretty_env_logger::init(); - - let opt = Opt::from_args(); - let gen = ChunkGenerator::new(opt.num); - - let store = Store::new(&opt.chunks); - for (id, _, meta, chunk) in gen { - store.save(&id, &meta, &chunk)?; - } - - Ok(()) -} diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs index 19f2e99..9b5a557 100644 --- a/src/bin/obnam-server.rs +++ b/src/bin/obnam-server.rs @@ -1,42 +1,43 @@ -use bytes::Bytes; +use anyhow::Context; +use clap::Parser; use log::{debug, error, info}; -use obnam::chunk::DataChunk; use obnam::chunkid::ChunkId; use obnam::chunkmeta::ChunkMeta; -use obnam::indexedstore::IndexedStore; -use serde::{Deserialize, Serialize}; +use obnam::chunkstore::ChunkStore; +use obnam::label::Label; +use obnam::server::{ServerConfig, ServerConfigError}; +use serde::Serialize; use std::collections::HashMap; use std::default::Default; use std::net::{SocketAddr, ToSocketAddrs}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use structopt::StructOpt; use tokio::sync::Mutex; use warp::http::StatusCode; +use warp::hyper::body::Bytes; use warp::Filter; -#[derive(Debug, StructOpt)] -#[structopt(name = "obnam2-server", about = "Backup server")] +#[derive(Debug, Parser)] +#[clap(name = "obnam2-server", about = "Backup server")] struct Opt { - #[structopt(parse(from_os_str))] config: PathBuf, } #[tokio::main] async fn main() -> anyhow::Result<()> { - pretty_env_logger::init(); + pretty_env_logger::init_custom_env("OBNAM_SERVER_LOG"); - let opt = Opt::from_args(); - let config = Config::read_config(&opt.config).unwrap(); + let opt = Opt::parse(); + let config = load_config(&opt.config)?; let addresses: Vec<SocketAddr> = config.address.to_socket_addrs()?.collect(); if addresses.is_empty() { error!("specified address is empty set: {:?}", addresses); eprintln!("ERROR: server address is empty: {:?}", addresses); - return Err(ConfigError::BadServerAddress.into()); + return Err(ServerConfigError::BadServerAddress.into()); } - let store = IndexedStore::new(&config.chunks)?; + let store = ChunkStore::local(&config.chunks)?; let store = Arc::new(Mutex::new(store)); let store = warp::any().map(move || Arc::clone(&store)); @@ -45,32 +46,32 @@ async fn main() -> anyhow::Result<()> { debug!("Configuration: {:#?}", config); let create = warp::post() + .and(warp::path("v1")) .and(warp::path("chunks")) + .and(warp::path::end()) .and(store.clone()) .and(warp::header("chunk-meta")) .and(warp::filters::body::bytes()) .and_then(create_chunk); let fetch = warp::get() + .and(warp::path("v1")) .and(warp::path("chunks")) .and(warp::path::param()) + .and(warp::path::end()) .and(store.clone()) .and_then(fetch_chunk); let search = warp::get() + .and(warp::path("v1")) .and(warp::path("chunks")) + .and(warp::path::end()) .and(warp::query::<HashMap<String, String>>()) .and(store.clone()) .and_then(search_chunks); - let delete = warp::delete() - .and(warp::path("chunks")) - .and(warp::path::param()) - .and(store.clone()) - .and_then(delete_chunk); - let log = warp::log("obnam"); - let webroot = create.or(fetch).or(search).or(delete).with(log); + let webroot = create.or(fetch).or(search).with(log); debug!("starting warp"); warp::serve(webroot) @@ -82,57 +83,22 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -#[derive(Debug, Deserialize, Clone)] -pub struct Config { - pub chunks: PathBuf, - pub address: String, - pub tls_key: PathBuf, - pub tls_cert: PathBuf, -} - -#[derive(Debug, thiserror::Error)] -enum ConfigError { - #[error("Directory for chunks {0} does not exist")] - ChunksDirNotFound(PathBuf), - - #[error("TLS certificate {0} does not exist")] - TlsCertNotFound(PathBuf), - - #[error("TLS key {0} does not exist")] - TlsKeyNotFound(PathBuf), - - #[error("server address can't be resolved")] - BadServerAddress, -} - -impl Config { - pub fn read_config(filename: &Path) -> anyhow::Result<Config> { - let config = std::fs::read_to_string(filename)?; - let config: Config = serde_yaml::from_str(&config)?; - config.check()?; - Ok(config) - } - - pub fn check(&self) -> anyhow::Result<()> { - if !self.chunks.exists() { - return Err(ConfigError::ChunksDirNotFound(self.chunks.clone()).into()); - } - if !self.tls_cert.exists() { - return Err(ConfigError::TlsCertNotFound(self.tls_cert.clone()).into()); - } - if !self.tls_key.exists() { - return Err(ConfigError::TlsKeyNotFound(self.tls_key.clone()).into()); - } - Ok(()) - } +fn load_config(filename: &Path) -> Result<ServerConfig, anyhow::Error> { + let config = ServerConfig::read_config(filename).with_context(|| { + format!( + "Couldn't read default configuration file {}", + filename.display() + ) + })?; + Ok(config) } pub async fn create_chunk( - store: Arc<Mutex<IndexedStore>>, + store: Arc<Mutex<ChunkStore>>, meta: String, data: Bytes, ) -> Result<impl warp::Reply, warp::Rejection> { - let mut store = store.lock().await; + let store = store.lock().await; let meta: ChunkMeta = match meta.parse() { Ok(s) => s, @@ -142,9 +108,7 @@ pub async fn create_chunk( } }; - let chunk = DataChunk::new(data.to_vec()); - - let id = match store.save(&meta, &chunk) { + let id = match store.put(data.to_vec(), &meta).await { Ok(id) => id, Err(e) => { error!("couldn't save: {}", e); @@ -152,17 +116,17 @@ pub async fn create_chunk( } }; - info!("created chunk {}: {:?}", id, meta); + info!("created chunk {}", id); Ok(ChunkResult::Created(id)) } pub async fn fetch_chunk( id: String, - store: Arc<Mutex<IndexedStore>>, + store: Arc<Mutex<ChunkStore>>, ) -> Result<impl warp::Reply, warp::Rejection> { let store = store.lock().await; let id: ChunkId = id.parse().unwrap(); - match store.load(&id) { + match store.get(&id).await { Ok((data, meta)) => { info!("found chunk {}: {:?}", id, meta); Ok(ChunkResult::Fetched(meta, data)) @@ -176,20 +140,23 @@ pub async fn fetch_chunk( pub async fn search_chunks( query: HashMap<String, String>, - store: Arc<Mutex<IndexedStore>>, + store: Arc<Mutex<ChunkStore>>, ) -> Result<impl warp::Reply, warp::Rejection> { let store = store.lock().await; let mut query = query.iter(); let found = if let Some((key, value)) = query.next() { - if query.next() != None { + if query.next().is_some() { error!("search has more than one key to search for"); return Ok(ChunkResult::BadRequest); } - if key == "generation" && value == "true" { - store.find_generations().expect("SQL lookup failed") - } else if key == "sha256" { - store.find_by_sha256(value).expect("SQL lookup failed") + if key == "label" { + let label = Label::deserialize(value).unwrap(); + let label = ChunkMeta::new(&label); + store + .find_by_label(&label) + .await + .expect("SQL lookup failed") } else { error!("unknown search key {:?}", key); return Ok(ChunkResult::BadRequest); @@ -201,7 +168,7 @@ pub async fn search_chunks( let mut hits = SearchHits::default(); for chunk_id in found { - let meta = match store.load_meta(&chunk_id) { + let (_, meta) = match store.get(&chunk_id).await { Ok(meta) => { info!("search found chunk {}", chunk_id); meta @@ -240,30 +207,10 @@ impl SearchHits { } } -pub async fn delete_chunk( - id: String, - store: Arc<Mutex<IndexedStore>>, -) -> Result<impl warp::Reply, warp::Rejection> { - let mut store = store.lock().await; - let id: ChunkId = id.parse().unwrap(); - - match store.remove(&id) { - Ok(_) => { - info!("chunk deleted: {}", id); - Ok(ChunkResult::Deleted) - } - Err(e) => { - error!("could not delete chunk {}: {:?}", id, e); - Ok(ChunkResult::NotFound) - } - } -} - enum ChunkResult { Created(ChunkId), - Fetched(ChunkMeta, DataChunk), + Fetched(ChunkMeta, Vec<u8>), Found(SearchHits), - Deleted, NotFound, BadRequest, InternalServerError, @@ -292,13 +239,12 @@ impl warp::Reply for ChunkResult { ); into_response( StatusCode::OK, - chunk.data(), + &chunk, "application/octet-stream", Some(headers), ) } ChunkResult::Found(hits) => json_response(StatusCode::OK, hits.to_json(), None), - ChunkResult::Deleted => status_response(StatusCode::OK), ChunkResult::BadRequest => status_response(StatusCode::BAD_REQUEST), ChunkResult::NotFound => status_response(StatusCode::NOT_FOUND), ChunkResult::InternalServerError => status_response(StatusCode::INTERNAL_SERVER_ERROR), diff --git a/src/bin/obnam.rs b/src/bin/obnam.rs index e9f30ca..240960b 100644 --- a/src/bin/obnam.rs +++ b/src/bin/obnam.rs @@ -1,100 +1,126 @@ +use clap::Parser; +use directories_next::ProjectDirs; use log::{debug, error, info, LevelFilter}; use log4rs::append::file::FileAppender; -use log4rs::config::{Appender, Config, Logger, Root}; -use obnam::client::ClientConfig; -use obnam::cmd::{backup, get_chunk, list, list_files, restore, show_generation}; +use log4rs::config::{Appender, Logger, Root}; +use obnam::cmd::backup::Backup; +use obnam::cmd::chunk::{DecryptChunk, EncryptChunk}; +use obnam::cmd::chunkify::Chunkify; +use obnam::cmd::gen_info::GenInfo; +use obnam::cmd::get_chunk::GetChunk; +use obnam::cmd::init::Init; +use obnam::cmd::inspect::Inspect; +use obnam::cmd::list::List; +use obnam::cmd::list_backup_versions::ListSchemaVersions; +use obnam::cmd::list_files::ListFiles; +use obnam::cmd::resolve::Resolve; +use obnam::cmd::restore::Restore; +use obnam::cmd::show_config::ShowConfig; +use obnam::cmd::show_gen::ShowGeneration; +use obnam::config::ClientConfig; +use obnam::performance::{Clock, Performance}; use std::path::{Path, PathBuf}; -use structopt::StructOpt; -const BUFFER_SIZE: usize = 1024 * 1024; +const QUALIFIER: &str = ""; +const ORG: &str = ""; +const APPLICATION: &str = "obnam"; -fn main() -> anyhow::Result<()> { - let opt = Opt::from_args(); - let config_file = match opt.config { - None => default_config(), - Some(ref path) => path.to_path_buf(), - }; - let config = ClientConfig::read_config(&config_file)?; - if let Some(ref log) = config.log { - setup_logging(&log)?; +fn main() { + let mut perf = Performance::default(); + perf.start(Clock::RunTime); + if let Err(err) = main_program(&mut perf) { + error!("{}", err); + eprintln!("ERROR: {}", err); + std::process::exit(1); } + perf.stop(Clock::RunTime); + perf.log(); +} + +fn main_program(perf: &mut Performance) -> anyhow::Result<()> { + let opt = Opt::parse(); + let config = ClientConfig::read(&config_filename(&opt))?; + setup_logging(&config.log)?; info!("client starts"); debug!("{:?}", opt); + debug!("configuration: {:#?}", config); - let result = match opt.cmd { - Command::Backup => backup(&config, BUFFER_SIZE), - Command::List => list(&config), - Command::ShowGeneration { gen_id } => show_generation(&config, &gen_id), - Command::ListFiles { gen_id } => list_files(&config, &gen_id), - Command::Restore { gen_id, to } => restore(&config, &gen_id, &to), - Command::GetChunk { chunk_id } => get_chunk(&config, &chunk_id), - }; - - if let Err(ref e) = result { - error!("{}", e); - eprintln!("ERROR: {}", e); - return result; - } + match opt.cmd { + Command::Init(x) => x.run(&config), + Command::ListBackupVersions(x) => x.run(&config), + Command::Backup(x) => x.run(&config, perf), + Command::Inspect(x) => x.run(&config), + Command::Chunkify(x) => x.run(&config), + Command::List(x) => x.run(&config), + Command::ShowGeneration(x) => x.run(&config), + Command::ListFiles(x) => x.run(&config), + Command::Resolve(x) => x.run(&config), + Command::Restore(x) => x.run(&config), + Command::GenInfo(x) => x.run(&config), + Command::GetChunk(x) => x.run(&config), + Command::Config(x) => x.run(&config), + Command::EncryptChunk(x) => x.run(&config), + Command::DecryptChunk(x) => x.run(&config), + }?; info!("client ends successfully"); Ok(()) } +fn setup_logging(filename: &Path) -> anyhow::Result<()> { + let logfile = FileAppender::builder().build(filename)?; + + let config = log4rs::Config::builder() + .appender(Appender::builder().build("obnam", Box::new(logfile))) + .logger(Logger::builder().build("obnam", LevelFilter::Debug)) + .build(Root::builder().appender("obnam").build(LevelFilter::Debug))?; + + log4rs::init_config(config)?; + + Ok(()) +} + +fn config_filename(opt: &Opt) -> PathBuf { + match opt.config { + None => default_config(), + Some(ref filename) => filename.to_path_buf(), + } +} + fn default_config() -> PathBuf { - if let Some(path) = dirs::config_dir() { - path.join("obnam").join("obnam.yaml") - } else if let Some(path) = dirs::home_dir() { - path.join(".config").join("obnam").join("obnam.yaml") + if let Some(dirs) = ProjectDirs::from(QUALIFIER, ORG, APPLICATION) { + dirs.config_dir().join("obnam.yaml") } else { - panic!("can't find config dir or home dir"); + panic!("can't figure out the configuration directory"); } } -#[derive(Debug, StructOpt)] -#[structopt(name = "obnam-backup", about = "Simplistic backup client")] +#[derive(Debug, Parser)] +#[clap(name = "obnam-backup", version, about = "Simplistic backup client")] struct Opt { - #[structopt(long, short, parse(from_os_str))] + #[clap(long, short)] config: Option<PathBuf>, - #[structopt(subcommand)] + #[clap(subcommand)] cmd: Command, } -#[derive(Debug, StructOpt)] +#[derive(Debug, Parser)] enum Command { - Backup, - List, - ListFiles { - #[structopt(default_value = "latest")] - gen_id: String, - }, - Restore { - #[structopt()] - gen_id: String, - - #[structopt(parse(from_os_str))] - to: PathBuf, - }, - ShowGeneration { - #[structopt(default_value = "latest")] - gen_id: String, - }, - GetChunk { - #[structopt()] - chunk_id: String, - }, -} - -fn setup_logging(filename: &Path) -> anyhow::Result<()> { - let logfile = FileAppender::builder().build(filename)?; - - let config = Config::builder() - .appender(Appender::builder().build("obnam", Box::new(logfile))) - .logger(Logger::builder().build("obnam", LevelFilter::Debug)) - .build(Root::builder().appender("obnam").build(LevelFilter::Debug))?; - - log4rs::init_config(config)?; - - Ok(()) + Init(Init), + Backup(Backup), + Inspect(Inspect), + Chunkify(Chunkify), + List(List), + ListBackupVersions(ListSchemaVersions), + ListFiles(ListFiles), + Restore(Restore), + GenInfo(GenInfo), + ShowGeneration(ShowGeneration), + Resolve(Resolve), + GetChunk(GetChunk), + Config(ShowConfig), + EncryptChunk(EncryptChunk), + DecryptChunk(DecryptChunk), } diff --git a/src/checksummer.rs b/src/checksummer.rs deleted file mode 100644 index 162c26b..0000000 --- a/src/checksummer.rs +++ /dev/null @@ -1,8 +0,0 @@ -use sha2::{Digest, Sha256}; - -pub fn sha256(data: &[u8]) -> String { - let mut hasher = Sha256::new(); - hasher.update(data); - let hash = hasher.finalize(); - format!("{:x}", hash) -} diff --git a/src/chunk.rs b/src/chunk.rs index 4917b60..a6abad3 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,60 +1,199 @@ +//! Chunks of data. + use crate::chunkid::ChunkId; +use crate::chunkmeta::ChunkMeta; +use crate::label::Label; use serde::{Deserialize, Serialize}; use std::default::Default; -/// Store an arbitrary chunk of data. -/// -/// The data is just arbitrary binary data. +/// An arbitrary chunk of arbitrary binary data. /// /// A chunk also contains its associated metadata, except its -/// identifier. -#[derive(Debug, Clone, Serialize, Deserialize)] +/// identifier, so that it's easy to keep the data and metadata +/// together. The identifier is used to find the chunk, and it's +/// assigned by the server when the chunk is uploaded, so it's not +/// stored in the chunk itself. +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct DataChunk { data: Vec<u8>, + meta: ChunkMeta, } impl DataChunk { - /// Construct a new chunk. - pub fn new(data: Vec<u8>) -> Self { - Self { data } + /// Create a new chunk. + pub fn new(data: Vec<u8>, meta: ChunkMeta) -> Self { + Self { data, meta } } /// Return a chunk's data. pub fn data(&self) -> &[u8] { &self.data } + + /// Return a chunk's metadata. + pub fn meta(&self) -> &ChunkMeta { + &self.meta + } } +/// A chunk representing a backup generation. +/// +/// A generation chunk lists all the data chunks for the SQLite file +/// with the backup's metadata. It's different from a normal data +/// chunk so that we can do things that make no sense to a data chunk. +/// Generation chunks can be converted into or created from data +/// chunks, for uploading to or downloading from the server. #[derive(Default, Debug, Serialize, Deserialize)] pub struct GenerationChunk { chunk_ids: Vec<ChunkId>, } +/// All the errors that may be returned for `GenerationChunk` operations. +#[derive(Debug, thiserror::Error)] +pub enum GenerationChunkError { + /// Error converting text from UTF8. + #[error(transparent)] + Utf8Error(#[from] std::str::Utf8Error), + + /// Error parsing JSON as chunk metadata. + #[error("failed to parse JSON: {0}")] + JsonParse(serde_json::Error), + + /// Error generating JSON from chunk metadata. + #[error("failed to serialize to JSON: {0}")] + JsonGenerate(serde_json::Error), +} + impl GenerationChunk { + /// Create a new backup generation chunk from metadata chunk ids. pub fn new(chunk_ids: Vec<ChunkId>) -> Self { Self { chunk_ids } } - pub fn from_data_chunk(chunk: &DataChunk) -> anyhow::Result<Self> { + /// Create a new backup generation chunk from a data chunk. + pub fn from_data_chunk(chunk: &DataChunk) -> Result<Self, GenerationChunkError> { let data = chunk.data(); let data = std::str::from_utf8(data)?; - Ok(serde_json::from_str(data)?) + serde_json::from_str(data).map_err(GenerationChunkError::JsonParse) } + /// Does the generation chunk contain any metadata chunks? pub fn is_empty(&self) -> bool { self.chunk_ids.is_empty() } + /// How many metadata chunks does generation chunk contain? pub fn len(&self) -> usize { self.chunk_ids.len() } + /// Return iterator over the metadata chunk identifiers. pub fn chunk_ids(&self) -> impl Iterator<Item = &ChunkId> { self.chunk_ids.iter() } - pub fn to_data_chunk(&self) -> anyhow::Result<DataChunk> { - let json = serde_json::to_string(self)?; - Ok(DataChunk::new(json.as_bytes().to_vec())) + /// Convert generation chunk to a data chunk. + pub fn to_data_chunk(&self) -> Result<DataChunk, GenerationChunkError> { + let json: String = + serde_json::to_string(self).map_err(GenerationChunkError::JsonGenerate)?; + let bytes = json.as_bytes().to_vec(); + let checksum = Label::sha256(&bytes); + let meta = ChunkMeta::new(&checksum); + Ok(DataChunk::new(bytes, meta)) + } +} + +/// A client trust root chunk. +/// +/// This chunk contains all per-client backup information. As long as +/// this chunk can be trusted, everything it links to can also be +/// trusted, thanks to cryptographic signatures. +#[derive(Debug, Serialize, Deserialize)] +pub struct ClientTrust { + client_name: String, + previous_version: Option<ChunkId>, + timestamp: String, + backups: Vec<ChunkId>, +} + +/// All the errors that may be returned for `ClientTrust` operations. +#[derive(Debug, thiserror::Error)] +pub enum ClientTrustError { + /// Error converting text from UTF8. + #[error(transparent)] + Utf8Error(#[from] std::str::Utf8Error), + + /// Error parsing JSON as chunk metadata. + #[error("failed to parse JSON: {0}")] + JsonParse(serde_json::Error), + + /// Error generating JSON from chunk metadata. + #[error("failed to serialize to JSON: {0}")] + JsonGenerate(serde_json::Error), +} + +impl ClientTrust { + /// Create a new ClientTrust object. + pub fn new( + name: &str, + previous_version: Option<ChunkId>, + timestamp: String, + backups: Vec<ChunkId>, + ) -> Self { + Self { + client_name: name.to_string(), + previous_version, + timestamp, + backups, + } + } + + /// Return client name. + pub fn client_name(&self) -> &str { + &self.client_name + } + + /// Return id of previous version, if any. + pub fn previous_version(&self) -> Option<ChunkId> { + self.previous_version.clone() + } + + /// Return timestamp. + pub fn timestamp(&self) -> &str { + &self.timestamp + } + + /// Return list of all backup generations known. + pub fn backups(&self) -> &[ChunkId] { + &self.backups + } + + /// Append a backup generation to the list. + pub fn append_backup(&mut self, id: &ChunkId) { + self.backups.push(id.clone()); + } + + /// Update for new upload. + /// + /// This needs to happen every time the chunk is updated so that + /// the timestamp gets updated. + pub fn finalize(&mut self, timestamp: String) { + self.timestamp = timestamp; + } + + /// Convert generation chunk to a data chunk. + pub fn to_data_chunk(&self) -> Result<DataChunk, ClientTrustError> { + let json: String = serde_json::to_string(self).map_err(ClientTrustError::JsonGenerate)?; + let bytes = json.as_bytes().to_vec(); + let checksum = Label::literal("client-trust"); + let meta = ChunkMeta::new(&checksum); + Ok(DataChunk::new(bytes, meta)) + } + + /// Create a new ClientTrust from a data chunk. + pub fn from_data_chunk(chunk: &DataChunk) -> Result<Self, ClientTrustError> { + let data = chunk.data(); + let data = std::str::from_utf8(data)?; + serde_json::from_str(data).map_err(ClientTrustError::JsonParse) } } diff --git a/src/chunker.rs b/src/chunker.rs index 145b1db..9883f89 100644 --- a/src/chunker.rs +++ b/src/chunker.rs @@ -1,30 +1,54 @@ -use crate::checksummer::sha256; +//! Split file data into chunks. + use crate::chunk::DataChunk; use crate::chunkmeta::ChunkMeta; +use crate::label::{Label, LabelChecksumKind}; use std::io::prelude::*; +use std::path::{Path, PathBuf}; -pub struct Chunker { +/// Iterator over chunks in a file. +pub struct FileChunks { chunk_size: usize, + kind: LabelChecksumKind, buf: Vec<u8>, + filename: PathBuf, handle: std::fs::File, } -impl Chunker { - pub fn new(chunk_size: usize, handle: std::fs::File) -> Self { - let mut buf = vec![]; - buf.resize(chunk_size, 0); +/// Possible errors from data chunking. +#[derive(Debug, thiserror::Error)] +pub enum ChunkerError { + /// Error reading from a file. + #[error("failed to read file {0}: {1}")] + FileRead(PathBuf, std::io::Error), +} + +impl FileChunks { + /// Create new iterator. + pub fn new( + chunk_size: usize, + handle: std::fs::File, + filename: &Path, + kind: LabelChecksumKind, + ) -> Self { + let buf = vec![0; chunk_size]; Self { chunk_size, + kind, buf, handle, + filename: filename.to_path_buf(), } } - pub fn read_chunk(&mut self) -> anyhow::Result<Option<(ChunkMeta, DataChunk)>> { + fn read_chunk(&mut self) -> Result<Option<DataChunk>, ChunkerError> { let mut used = 0; loop { - let n = self.handle.read(&mut self.buf.as_mut_slice()[used..])?; + let n = self + .handle + .read(&mut self.buf.as_mut_slice()[used..]) + .map_err(|err| ChunkerError::FileRead(self.filename.to_path_buf(), err))?; used += n; if n == 0 || used == self.chunk_size { break; @@ -36,20 +60,24 @@ impl Chunker { } let buffer = &self.buf.as_slice()[..used]; - let hash = sha256(buffer); + let hash = match self.kind { + LabelChecksumKind::Blake2 => Label::blake2(buffer), + LabelChecksumKind::Sha256 => Label::sha256(buffer), + }; let meta = ChunkMeta::new(&hash); - let chunk = DataChunk::new(buffer.to_vec()); - Ok(Some((meta, chunk))) + let chunk = DataChunk::new(buffer.to_vec(), meta); + Ok(Some(chunk)) } } -impl Iterator for Chunker { - type Item = anyhow::Result<(ChunkMeta, DataChunk)>; +impl Iterator for FileChunks { + type Item = Result<DataChunk, ChunkerError>; - fn next(&mut self) -> Option<anyhow::Result<(ChunkMeta, DataChunk)>> { + /// Return the next chunk, if any, or an error. + fn next(&mut self) -> Option<Result<DataChunk, ChunkerError>> { match self.read_chunk() { Ok(None) => None, - Ok(Some((meta, chunk))) => Some(Ok((meta, chunk))), + Ok(Some(chunk)) => Some(Ok(chunk)), Err(e) => Some(Err(e)), } } diff --git a/src/chunkid.rs b/src/chunkid.rs index 3933d4b..50fc3d3 100644 --- a/src/chunkid.rs +++ b/src/chunkid.rs @@ -1,4 +1,9 @@ -use crate::checksummer::sha256; +//! The identifier for a chunk. +//! +//! Chunk identifiers are chosen by the server. Each chunk has a +//! unique identifier, which isn't based on the contents of the chunk. + +use crate::label::Label; use rusqlite::types::ToSqlOutput; use rusqlite::ToSql; use serde::{Deserialize, Serialize}; @@ -37,20 +42,24 @@ impl ChunkId { } } - pub fn from_str(s: &str) -> Self { + /// Re-construct an identifier from a previous value. + pub fn recreate(s: &str) -> Self { ChunkId { id: s.to_string() } } + /// Return the identifier as a slice of bytes. pub fn as_bytes(&self) -> &[u8] { self.id.as_bytes() } - pub fn sha256(&self) -> String { - sha256(self.id.as_bytes()) + /// Return the SHA256 checksum of the identifier. + pub fn sha256(&self) -> Label { + Label::sha256(self.id.as_bytes()) } } impl ToSql for ChunkId { + /// Format identifier for SQL. fn to_sql(&self) -> rusqlite::Result<ToSqlOutput> { Ok(ToSqlOutput::Owned(rusqlite::types::Value::Text( self.id.clone(), @@ -68,12 +77,14 @@ impl fmt::Display for ChunkId { } impl From<&String> for ChunkId { + /// Create a chunk identifier from a string. fn from(s: &String) -> Self { ChunkId { id: s.to_string() } } } impl From<&OsStr> for ChunkId { + /// Create a chunk identifier from an operating system string. fn from(s: &OsStr) -> Self { ChunkId { id: s.to_string_lossy().to_string(), @@ -84,8 +95,9 @@ impl From<&OsStr> for ChunkId { impl FromStr for ChunkId { type Err = (); + /// Create a chunk from a string. fn from_str(s: &str) -> Result<Self, Self::Err> { - Ok(ChunkId::from_str(s)) + Ok(ChunkId::recreate(s)) } } @@ -117,6 +129,6 @@ mod test { fn survives_round_trip() { let id = ChunkId::new(); let id_str = id.to_string(); - assert_eq!(id, ChunkId::from_str(&id_str)) + assert_eq!(id, ChunkId::recreate(&id_str)) } } diff --git a/src/chunkmeta.rs b/src/chunkmeta.rs index 37e2ed5..e2fa9b3 100644 --- a/src/chunkmeta.rs +++ b/src/chunkmeta.rs @@ -1,28 +1,21 @@ +//! Metadata about a chunk. + +use crate::label::Label; use serde::{Deserialize, Serialize}; use std::default::Default; use std::str::FromStr; /// Metadata about chunks. /// -/// We manage three bits of metadata about chunks, in addition to its -/// identifier: -/// -/// * for all chunks, a [SHA256][] checksum of the chunk content -/// -/// * for generation chunks, an indication that it is a generation -/// chunk, and a timestamp for when making the generation snapshot -/// ended -/// -/// There is no syntax or semantics imposed on the timestamp, but a -/// client should probably use [ISO 8601][] representation. +/// We a single piece of metadata about chunks, in addition to its +/// identifier: a label assigned by the client. Currently, this is a +/// [SHA256][] checksum of the chunk content. /// /// For HTTP, the metadata will be serialised as a JSON object, like this: /// /// ~~~json /// { -/// "sha256": "09ca7e4eaa6e8ae9c7d261167129184883644d07dfba7cbfbc4c8a2e08360d5b", -/// "generation": true, -/// "ended": "2020-09-17T08:17:13+03:00" +/// "label": "09ca7e4eaa6e8ae9c7d261167129184883644d07dfba7cbfbc4c8a2e08360d5b", /// } /// ~~~ /// @@ -35,55 +28,44 @@ use std::str::FromStr; /// /// [ISO 8601]: https://en.wikipedia.org/wiki/ISO_8601 /// [SHA256]: https://en.wikipedia.org/wiki/SHA-2 -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] pub struct ChunkMeta { - sha256: String, - // The remaining fields are Options so that JSON parsing doesn't - // insist on them being there in the textual representation. - generation: Option<bool>, - ended: Option<String>, + label: String, } impl ChunkMeta { /// Create a new data chunk. /// /// Data chunks are not for generations. - pub fn new(sha256: &str) -> Self { + pub fn new(label: &Label) -> Self { ChunkMeta { - sha256: sha256.to_string(), - generation: None, - ended: None, + label: label.serialize(), } } - /// Create a new generation chunk. - pub fn new_generation(sha256: &str, ended: &str) -> Self { - ChunkMeta { - sha256: sha256.to_string(), - generation: Some(true), - ended: Some(ended.to_string()), - } - } - - /// Is this a generation chunk? - pub fn is_generation(&self) -> bool { - matches!(self.generation, Some(true)) - } - - /// When did this generation end? - pub fn ended(&self) -> Option<&str> { - self.ended.as_deref() + /// The label of the content of the chunk. + /// + /// The caller should not interpret the label in any way. It + /// happens to be a SHA256 of the cleartext contents of the + /// checksum for now, but that _will_ change in the future. + pub fn label(&self) -> &str { + &self.label } - /// SHA256 checksum of the content of the chunk. - pub fn sha256(&self) -> &str { - &self.sha256 + /// Serialize from a textual JSON representation. + pub fn from_json(json: &str) -> Result<Self, serde_json::Error> { + serde_json::from_str(json) } /// Serialize as JSON. pub fn to_json(&self) -> String { serde_json::to_string(self).unwrap() } + + /// Serialize as JSON, as a byte vector. + pub fn to_json_vec(&self) -> Vec<u8> { + self.to_json().as_bytes().to_vec() + } } impl FromStr for ChunkMeta { @@ -97,48 +79,54 @@ impl FromStr for ChunkMeta { #[cfg(test)] mod test { - use super::ChunkMeta; + use super::{ChunkMeta, Label}; #[test] fn new_creates_data_chunk() { - let meta = ChunkMeta::new("abcdef"); - assert!(!meta.is_generation()); - assert_eq!(meta.ended(), None); - assert_eq!(meta.sha256(), "abcdef"); + let sum = Label::sha256(b"abcdef"); + let meta = ChunkMeta::new(&sum); + assert_eq!(meta.label(), sum.serialize()); } #[test] fn new_generation_creates_generation_chunk() { - let meta = ChunkMeta::new_generation("abcdef", "2020-09-17T08:17:13+03:00"); - assert!(meta.is_generation()); - assert_eq!(meta.ended(), Some("2020-09-17T08:17:13+03:00")); - assert_eq!(meta.sha256(), "abcdef"); + let sum = Label::sha256(b"abcdef"); + let meta = ChunkMeta::new(&sum); + assert_eq!(meta.label(), sum.serialize()); } #[test] fn data_chunk_from_json() { - let meta: ChunkMeta = r#"{"sha256": "abcdef"}"#.parse().unwrap(); - assert!(!meta.is_generation()); - assert_eq!(meta.ended(), None); - assert_eq!(meta.sha256(), "abcdef"); + let meta: ChunkMeta = r#"{"label": "abcdef"}"#.parse().unwrap(); + assert_eq!(meta.label(), "abcdef"); } #[test] fn generation_chunk_from_json() { let meta: ChunkMeta = - r#"{"sha256": "abcdef", "generation": true, "ended": "2020-09-17T08:17:13+03:00"}"# + r#"{"label": "abcdef", "generation": true, "ended": "2020-09-17T08:17:13+03:00"}"# .parse() .unwrap(); - assert!(meta.is_generation()); - assert_eq!(meta.ended(), Some("2020-09-17T08:17:13+03:00")); - assert_eq!(meta.sha256(), "abcdef"); + + assert_eq!(meta.label(), "abcdef"); } #[test] - fn json_roundtrip() { - let meta = ChunkMeta::new_generation("abcdef", "2020-09-17T08:17:13+03:00"); + fn generation_json_roundtrip() { + let sum = Label::sha256(b"abcdef"); + let meta = ChunkMeta::new(&sum); let json = serde_json::to_string(&meta).unwrap(); let meta2 = serde_json::from_str(&json).unwrap(); assert_eq!(meta, meta2); } + + #[test] + fn data_json_roundtrip() { + let sum = Label::sha256(b"abcdef"); + let meta = ChunkMeta::new(&sum); + let json = meta.to_json_vec(); + let meta2 = serde_json::from_slice(&json).unwrap(); + assert_eq!(meta, meta2); + assert_eq!(meta.to_json_vec(), meta2.to_json_vec()); + } } diff --git a/src/chunkstore.rs b/src/chunkstore.rs new file mode 100644 index 0000000..4c8125c --- /dev/null +++ b/src/chunkstore.rs @@ -0,0 +1,307 @@ +//! Access local and remote chunk stores. +//! +//! A chunk store may be local and accessed via the file system, or +//! remote and accessed over HTTP. This module implements both. This +//! module only handles encrypted chunks. + +use crate::chunkid::ChunkId; +use crate::chunkmeta::ChunkMeta; +use crate::config::{ClientConfig, ClientConfigError}; +use crate::index::{Index, IndexError}; + +use log::{debug, error, info}; +use reqwest::header::HeaderMap; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use tokio::sync::Mutex; + +/// A chunk store. +/// +/// The store may be local or remote. +pub enum ChunkStore { + /// A local chunk store. + Local(LocalStore), + + /// A remote chunk store. + Remote(RemoteStore), +} + +impl ChunkStore { + /// Open a local chunk store. + pub fn local<P: AsRef<Path>>(path: P) -> Result<Self, StoreError> { + let store = LocalStore::new(path.as_ref())?; + Ok(Self::Local(store)) + } + + /// Open a remote chunk store. + pub fn remote(config: &ClientConfig) -> Result<Self, StoreError> { + let store = RemoteStore::new(config)?; + Ok(Self::Remote(store)) + } + + /// Does the store have a chunk with a given label? + pub async fn find_by_label(&self, meta: &ChunkMeta) -> Result<Vec<ChunkId>, StoreError> { + match self { + Self::Local(store) => store.find_by_label(meta).await, + Self::Remote(store) => store.find_by_label(meta).await, + } + } + + /// Store a chunk in the store. + /// + /// The store chooses an id for the chunk. + pub async fn put(&self, chunk: Vec<u8>, meta: &ChunkMeta) -> Result<ChunkId, StoreError> { + match self { + Self::Local(store) => store.put(chunk, meta).await, + Self::Remote(store) => store.put(chunk, meta).await, + } + } + + /// Get a chunk given its id. + pub async fn get(&self, id: &ChunkId) -> Result<(Vec<u8>, ChunkMeta), StoreError> { + match self { + Self::Local(store) => store.get(id).await, + Self::Remote(store) => store.get(id).await, + } + } +} + +/// A local chunk store. +pub struct LocalStore { + path: PathBuf, + index: Mutex<Index>, +} + +impl LocalStore { + fn new(path: &Path) -> Result<Self, StoreError> { + Ok(Self { + path: path.to_path_buf(), + index: Mutex::new(Index::new(path)?), + }) + } + + async fn find_by_label(&self, meta: &ChunkMeta) -> Result<Vec<ChunkId>, StoreError> { + self.index + .lock() + .await + .find_by_label(meta.label()) + .map_err(StoreError::Index) + } + + async fn put(&self, chunk: Vec<u8>, meta: &ChunkMeta) -> Result<ChunkId, StoreError> { + let id = ChunkId::new(); + let (dir, filename) = self.filename(&id); + + if !dir.exists() { + std::fs::create_dir_all(&dir).map_err(|err| StoreError::ChunkMkdir(dir, err))?; + } + + std::fs::write(&filename, &chunk) + .map_err(|err| StoreError::WriteChunk(filename.clone(), err))?; + self.index + .lock() + .await + .insert_meta(id.clone(), meta.clone()) + .map_err(StoreError::Index)?; + Ok(id) + } + + async fn get(&self, id: &ChunkId) -> Result<(Vec<u8>, ChunkMeta), StoreError> { + let meta = self.index.lock().await.get_meta(id)?; + + let (_, filename) = &self.filename(id); + + let raw = + std::fs::read(filename).map_err(|err| StoreError::ReadChunk(filename.clone(), err))?; + + Ok((raw, meta)) + } + + fn filename(&self, id: &ChunkId) -> (PathBuf, PathBuf) { + let bytes = id.as_bytes(); + assert!(bytes.len() > 3); + let a = bytes[0]; + let b = bytes[1]; + let c = bytes[2]; + let dir = self.path.join(format!("{}/{}/{}", a, b, c)); + let filename = dir.join(format!("{}.data", id)); + (dir, filename) + } +} + +/// A remote chunk store. +pub struct RemoteStore { + client: reqwest::Client, + base_url: String, +} + +impl RemoteStore { + fn new(config: &ClientConfig) -> Result<Self, StoreError> { + info!("creating remote store with config: {:#?}", config); + + let client = reqwest::Client::builder() + .danger_accept_invalid_certs(!config.verify_tls_cert) + .build() + .map_err(StoreError::ReqwestError)?; + Ok(Self { + client, + base_url: config.server_url.to_string(), + }) + } + + async fn find_by_label(&self, meta: &ChunkMeta) -> Result<Vec<ChunkId>, StoreError> { + let body = match self.get_helper("", &[("label", meta.label())]).await { + Ok((_, body)) => body, + Err(err) => return Err(err), + }; + + let hits: HashMap<String, ChunkMeta> = + serde_json::from_slice(&body).map_err(StoreError::JsonParse)?; + let ids = hits.keys().map(|id| ChunkId::recreate(id)).collect(); + Ok(ids) + } + + async fn put(&self, chunk: Vec<u8>, meta: &ChunkMeta) -> Result<ChunkId, StoreError> { + let res = self + .client + .post(&self.chunks_url()) + .header("chunk-meta", meta.to_json()) + .body(chunk) + .send() + .await + .map_err(StoreError::ReqwestError)?; + let res: HashMap<String, String> = res.json().await.map_err(StoreError::ReqwestError)?; + debug!("upload_chunk: res={:?}", res); + let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { + debug!("upload_chunk: id={}", chunk_id); + chunk_id.parse().unwrap() + } else { + return Err(StoreError::NoCreatedChunkId); + }; + info!("uploaded_chunk {}", chunk_id); + Ok(chunk_id) + } + + async fn get(&self, id: &ChunkId) -> Result<(Vec<u8>, ChunkMeta), StoreError> { + let (headers, body) = self.get_helper(&format!("/{}", id), &[]).await?; + let meta = self.get_chunk_meta_header(id, &headers)?; + Ok((body, meta)) + } + + fn base_url(&self) -> &str { + &self.base_url + } + + fn chunks_url(&self) -> String { + format!("{}/v1/chunks", self.base_url()) + } + + async fn get_helper( + &self, + path: &str, + query: &[(&str, &str)], + ) -> Result<(HeaderMap, Vec<u8>), StoreError> { + let url = format!("{}{}", &self.chunks_url(), path); + info!("GET {}", url); + + // Build HTTP request structure. + let req = self + .client + .get(&url) + .query(query) + .build() + .map_err(StoreError::ReqwestError)?; + + // Make HTTP request. + let res = self + .client + .execute(req) + .await + .map_err(StoreError::ReqwestError)?; + + // Did it work? + if res.status() != 200 { + return Err(StoreError::NotFound(path.to_string())); + } + + // Return headers and body. + let headers = res.headers().clone(); + let body = res.bytes().await.map_err(StoreError::ReqwestError)?; + let body = body.to_vec(); + Ok((headers, body)) + } + + fn get_chunk_meta_header( + &self, + chunk_id: &ChunkId, + headers: &HeaderMap, + ) -> Result<ChunkMeta, StoreError> { + let meta = headers.get("chunk-meta"); + + if meta.is_none() { + let err = StoreError::NoChunkMeta(chunk_id.clone()); + error!("fetching chunk {} failed: {}", chunk_id, err); + return Err(err); + } + + let meta = meta + .unwrap() + .to_str() + .map_err(StoreError::MetaHeaderToString)?; + let meta: ChunkMeta = serde_json::from_str(meta).map_err(StoreError::JsonParse)?; + + Ok(meta) + } +} + +/// Possible errors from using a ChunkStore. +#[derive(Debug, thiserror::Error)] +pub enum StoreError { + /// FIXME + #[error("FIXME")] + FIXME, + + /// Error from a chunk index. + #[error(transparent)] + Index(#[from] IndexError), + + /// An error from the HTTP library. + #[error("error from reqwest library: {0}")] + ReqwestError(reqwest::Error), + + /// Client configuration is wrong. + #[error(transparent)] + ClientConfigError(#[from] ClientConfigError), + + /// Server claims to not have an entity. + #[error("Server does not have {0}")] + NotFound(String), + + /// Server didn't give us a chunk's metadata. + #[error("Server response did not have a 'chunk-meta' header for chunk {0}")] + NoChunkMeta(ChunkId), + + /// An error with the `chunk-meta` header. + #[error("couldn't convert response chunk-meta header to string: {0}")] + MetaHeaderToString(reqwest::header::ToStrError), + + /// Error parsing JSON. + #[error("failed to parse JSON: {0}")] + JsonParse(serde_json::Error), + + /// An error creating chunk directory. + #[error("Failed to create chunk directory {0}")] + ChunkMkdir(PathBuf, #[source] std::io::Error), + + /// An error writing a chunk file. + #[error("Failed to write chunk {0}")] + WriteChunk(PathBuf, #[source] std::io::Error), + + /// An error reading a chunk file. + #[error("Failed to read chunk {0}")] + ReadChunk(PathBuf, #[source] std::io::Error), + + /// No chunk id for uploaded chunk. + #[error("Server response claimed it had created a chunk, but lacked chunk id")] + NoCreatedChunkId, +} diff --git a/src/cipher.rs b/src/cipher.rs new file mode 100644 index 0000000..21785b9 --- /dev/null +++ b/src/cipher.rs @@ -0,0 +1,249 @@ +//! Encryption cipher algorithms. + +use crate::chunk::DataChunk; +use crate::chunkmeta::ChunkMeta; +use crate::passwords::Passwords; + +use aes_gcm::aead::{generic_array::GenericArray, Aead, KeyInit, Payload}; +use aes_gcm::Aes256Gcm; // Or `Aes128Gcm` +use rand::Rng; + +use std::str::FromStr; + +const CHUNK_V1: &[u8] = b"0001"; + +/// An encrypted chunk. +/// +/// This consists of encrypted ciphertext, and un-encrypted (or +/// cleartext) additional associated data, which could be the metadata +/// of the chunk, and be used to, for example, find chunks. +/// +/// Encrypted chunks are the only chunks that can be uploaded to the +/// server. +pub struct EncryptedChunk { + ciphertext: Vec<u8>, + aad: Vec<u8>, +} + +impl EncryptedChunk { + /// Create an encrypted chunk. + fn new(ciphertext: Vec<u8>, aad: Vec<u8>) -> Self { + Self { ciphertext, aad } + } + + /// Return the encrypted data. + pub fn ciphertext(&self) -> &[u8] { + &self.ciphertext + } + + /// Return the cleartext associated additional data. + pub fn aad(&self) -> &[u8] { + &self.aad + } +} + +/// An engine for encrypting and decrypting chunks. +pub struct CipherEngine { + cipher: Aes256Gcm, +} + +impl CipherEngine { + /// Create a new cipher engine using cleartext passwords. + pub fn new(pass: &Passwords) -> Self { + let key = GenericArray::from_slice(pass.encryption_key()); + Self { + cipher: Aes256Gcm::new(key), + } + } + + /// Encrypt a chunk. + pub fn encrypt_chunk(&self, chunk: &DataChunk) -> Result<EncryptedChunk, CipherError> { + // Payload with metadata as associated data, to be encrypted. + // + // The metadata will be stored in cleartext after encryption. + let aad = chunk.meta().to_json_vec(); + let payload = Payload { + msg: chunk.data(), + aad: &aad, + }; + + // Unique random key for each encryption. + let nonce = Nonce::new(); + let nonce_arr = GenericArray::from_slice(nonce.as_bytes()); + + // Encrypt the sensitive part. + let ciphertext = self + .cipher + .encrypt(nonce_arr, payload) + .map_err(CipherError::EncryptError)?; + + // Construct the blob to be stored on the server. + let mut vec: Vec<u8> = vec![]; + push_bytes(&mut vec, CHUNK_V1); + push_bytes(&mut vec, nonce.as_bytes()); + push_bytes(&mut vec, &ciphertext); + + Ok(EncryptedChunk::new(vec, aad)) + } + + /// Decrypt a chunk. + pub fn decrypt_chunk(&self, bytes: &[u8], meta: &[u8]) -> Result<DataChunk, CipherError> { + // Does encrypted chunk start with the right version? + if !bytes.starts_with(CHUNK_V1) { + return Err(CipherError::UnknownChunkVersion); + } + let version_len = CHUNK_V1.len(); + let bytes = &bytes[version_len..]; + + let (nonce, ciphertext) = match bytes.get(..NONCE_SIZE) { + Some(nonce) => (GenericArray::from_slice(nonce), &bytes[NONCE_SIZE..]), + None => return Err(CipherError::NoNonce), + }; + + let payload = Payload { + msg: ciphertext, + aad: meta, + }; + + let payload = self + .cipher + .decrypt(nonce, payload) + .map_err(CipherError::DecryptError)?; + let payload = Payload::from(payload.as_slice()); + + let meta = std::str::from_utf8(meta)?; + let meta = ChunkMeta::from_str(meta)?; + + let chunk = DataChunk::new(payload.msg.to_vec(), meta); + + Ok(chunk) + } +} + +fn push_bytes(vec: &mut Vec<u8>, bytes: &[u8]) { + for byte in bytes.iter() { + vec.push(*byte); + } +} + +/// Possible errors when encrypting or decrypting chunks. +#[derive(Debug, thiserror::Error)] +pub enum CipherError { + /// Encryption failed. + #[error("failed to encrypt with AES-GEM: {0}")] + EncryptError(aes_gcm::Error), + + /// The encrypted chunk has an unsupported version or is + /// corrupted. + #[error("encrypted chunk does not start with correct version")] + UnknownChunkVersion, + + /// The encrypted chunk lacks a complete nonce value, and is + /// probably corrupted. + #[error("encrypted chunk does not have a complete nonce")] + NoNonce, + + /// Decryption failed. + #[error("failed to decrypt with AES-GEM: {0}")] + DecryptError(aes_gcm::Error), + + /// The decryption succeeded, by data isn't valid YAML. + #[error("failed to parse decrypted data as a DataChunk: {0}")] + Parse(serde_yaml::Error), + + /// Error parsing UTF8 data. + #[error(transparent)] + Utf8Error(#[from] std::str::Utf8Error), + + /// Error parsing JSON data. + #[error("failed to parse JSON: {0}")] + JsonParse(#[from] serde_json::Error), +} + +const NONCE_SIZE: usize = 12; + +#[derive(Debug)] +struct Nonce { + nonce: Vec<u8>, +} + +impl Nonce { + fn from_bytes(bytes: &[u8]) -> Self { + assert_eq!(bytes.len(), NONCE_SIZE); + Self { + nonce: bytes.to_vec(), + } + } + + fn new() -> Self { + let mut bytes: Vec<u8> = vec![0; NONCE_SIZE]; + let mut rng = rand::thread_rng(); + for x in bytes.iter_mut() { + *x = rng.gen(); + } + Self::from_bytes(&bytes) + } + + fn as_bytes(&self) -> &[u8] { + &self.nonce + } +} + +#[cfg(test)] +mod test { + use crate::chunk::DataChunk; + use crate::chunkmeta::ChunkMeta; + use crate::cipher::{CipherEngine, CipherError, CHUNK_V1, NONCE_SIZE}; + use crate::label::Label; + use crate::passwords::Passwords; + + #[test] + fn metadata_as_aad() { + let sum = Label::sha256(b"dummy data"); + let meta = ChunkMeta::new(&sum); + let meta_as_aad = meta.to_json_vec(); + let chunk = DataChunk::new("hello".as_bytes().to_vec(), meta); + let pass = Passwords::new("secret"); + let cipher = CipherEngine::new(&pass); + let enc = cipher.encrypt_chunk(&chunk).unwrap(); + + assert_eq!(meta_as_aad, enc.aad()); + } + + #[test] + fn round_trip() { + let sum = Label::sha256(b"dummy data"); + let meta = ChunkMeta::new(&sum); + let chunk = DataChunk::new("hello".as_bytes().to_vec(), meta); + let pass = Passwords::new("secret"); + + let cipher = CipherEngine::new(&pass); + let enc = cipher.encrypt_chunk(&chunk).unwrap(); + + let bytes: Vec<u8> = enc.ciphertext().to_vec(); + let dec = cipher.decrypt_chunk(&bytes, enc.aad()).unwrap(); + assert_eq!(chunk, dec); + } + + #[test] + fn decrypt_errors_if_nonce_is_too_short() { + let pass = Passwords::new("our little test secret"); + let e = CipherEngine::new(&pass); + + // *Almost* a valid chunk header, except it's one byte too short + let bytes = { + let mut result = [0; CHUNK_V1.len() + NONCE_SIZE - 1]; + for (i, x) in CHUNK_V1.iter().enumerate() { + result[i] = *x; + } + result + }; + + let meta = [0; 0]; + + assert!(matches!( + e.decrypt_chunk(&bytes, &meta), + Err(CipherError::NoNonce) + )); + } +} diff --git a/src/client.rs b/src/client.rs index 515b8c9..a924052 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,271 +1,207 @@ -use crate::checksummer::sha256; -use crate::chunk::DataChunk; -use crate::chunk::GenerationChunk; -use crate::chunker::Chunker; +//! Client to the Obnam server HTTP API. + +use crate::chunk::{ + ClientTrust, ClientTrustError, DataChunk, GenerationChunk, GenerationChunkError, +}; use crate::chunkid::ChunkId; use crate::chunkmeta::ChunkMeta; -use crate::error::ObnamError; -use crate::fsentry::{FilesystemEntry, FilesystemKind}; -use crate::generation::{FinishedGeneration, LocalGeneration}; +use crate::chunkstore::{ChunkStore, StoreError}; +use crate::cipher::{CipherEngine, CipherError}; +use crate::config::{ClientConfig, ClientConfigError}; +use crate::generation::{FinishedGeneration, GenId, LocalGeneration, LocalGenerationError}; use crate::genlist::GenerationList; +use crate::label::Label; -use anyhow::Context; -use chrono::{DateTime, Local}; -use log::{debug, error, info, trace}; -use reqwest::blocking::Client; -use serde::Deserialize; -use std::collections::HashMap; +use log::{error, info}; use std::fs::File; use std::io::prelude::*; use std::path::{Path, PathBuf}; -#[derive(Debug, Deserialize, Clone)] -pub struct ClientConfig { - pub server_url: String, - pub root: PathBuf, - pub log: Option<PathBuf>, -} - -impl ClientConfig { - pub fn read_config(filename: &Path) -> anyhow::Result<Self> { - trace!("read_config: filename={:?}", filename); - let config = std::fs::read_to_string(filename) - .with_context(|| format!("reading configuration file {}", filename.display()))?; - let config = serde_yaml::from_str(&config)?; - Ok(config) - } -} - +/// Possible errors when using the server API. #[derive(Debug, thiserror::Error)] pub enum ClientError { - #[error("Server successful response to creating chunk lacked chunk id")] + /// No chunk id for uploaded chunk. + #[error("Server response claimed it had created a chunk, but lacked chunk id")] NoCreatedChunkId, + /// Server claims to not have an entity. + #[error("Server does not have {0}")] + NotFound(String), + + /// Server does not have a chunk. #[error("Server does not have chunk {0}")] - ChunkNotFound(String), + ChunkNotFound(ChunkId), + /// Server does not have generation. #[error("Server does not have generation {0}")] - GenerationNotFound(String), -} + GenerationNotFound(ChunkId), -pub struct BackupClient { - client: Client, - base_url: String, -} + /// Server didn't give us a chunk's metadata. + #[error("Server response did not have a 'chunk-meta' header for chunk {0}")] + NoChunkMeta(ChunkId), -impl BackupClient { - pub fn new(base_url: &str) -> anyhow::Result<Self> { - let client = Client::builder() - .danger_accept_invalid_certs(true) - .build()?; - Ok(Self { - client, - base_url: base_url.to_string(), - }) - } + /// Chunk has wrong checksum and may be corrupted. + #[error("Wrong checksum for chunk {0}, got {1}, expected {2}")] + WrongChecksum(ChunkId, String, String), - pub fn upload_filesystem_entry( - &self, - e: &FilesystemEntry, - size: usize, - ) -> anyhow::Result<Vec<ChunkId>> { - info!("upload entry: {:?}", e); - let ids = match e.kind() { - FilesystemKind::Regular => self.read_file(e.pathbuf(), size)?, - FilesystemKind::Directory => vec![], - FilesystemKind::Symlink => vec![], - }; - Ok(ids) - } + /// Client configuration is wrong. + #[error(transparent)] + ClientConfigError(#[from] ClientConfigError), - pub fn upload_generation(&self, filename: &Path, size: usize) -> anyhow::Result<ChunkId> { - info!("upload SQLite {}", filename.display()); - let ids = self.read_file(filename.to_path_buf(), size)?; - let gen = GenerationChunk::new(ids); - let data = gen.to_data_chunk()?; - let meta = ChunkMeta::new_generation(&sha256(data.data()), ¤t_timestamp()); - let gen_id = self.upload_gen_chunk(meta.clone(), gen)?; - info!("uploaded generation {}, meta {:?}", gen_id, meta); - Ok(gen_id) - } + /// An error encrypting or decrypting chunks. + #[error(transparent)] + CipherError(#[from] CipherError), - fn read_file(&self, filename: PathBuf, size: usize) -> anyhow::Result<Vec<ChunkId>> { - info!("upload file {}", filename.display()); - let file = std::fs::File::open(filename)?; - let chunker = Chunker::new(size, file); - let chunk_ids = self.upload_new_file_chunks(chunker)?; - Ok(chunk_ids) - } + /// An error regarding generation chunks. + #[error(transparent)] + GenerationChunkError(#[from] GenerationChunkError), - fn base_url(&self) -> &str { - &self.base_url - } + /// An error regarding client trust. + #[error(transparent)] + ClientTrust(#[from] ClientTrustError), - fn chunks_url(&self) -> String { - format!("{}/chunks", self.base_url()) - } + /// An error using a backup's local metadata. + #[error(transparent)] + LocalGenerationError(#[from] LocalGenerationError), - pub fn has_chunk(&self, meta: &ChunkMeta) -> anyhow::Result<Option<ChunkId>> { - trace!("has_chunk: url={:?}", self.base_url()); - let req = self - .client - .get(&self.chunks_url()) - .query(&[("sha256", meta.sha256())]) - .build()?; - - let res = self.client.execute(req)?; - debug!("has_chunk: status={}", res.status()); - let has = if res.status() != 200 { - debug!("has_chunk: error from server"); - None - } else { - let text = res.text()?; - debug!("has_chunk: text={:?}", text); - let hits: HashMap<String, ChunkMeta> = serde_json::from_str(&text)?; - debug!("has_chunk: hits={:?}", hits); - let mut iter = hits.iter(); - if let Some((chunk_id, _)) = iter.next() { - debug!("has_chunk: chunk_id={:?}", chunk_id); - Some(chunk_id.into()) - } else { - None - } - }; + /// An error with the `chunk-meta` header. + #[error("couldn't convert response chunk-meta header to string: {0}")] + MetaHeaderToString(reqwest::header::ToStrError), + + /// An error from the HTTP library. + #[error("error from reqwest library: {0}")] + ReqwestError(reqwest::Error), + + /// Couldn't look up a chunk via checksum. + #[error("lookup by chunk checksum failed: {0}")] + ChunkExists(reqwest::Error), + + /// Error parsing JSON. + #[error("failed to parse JSON: {0}")] + JsonParse(serde_json::Error), - info!("has_chunk result: {:?}", has); - Ok(has) + /// Error generating JSON. + #[error("failed to generate JSON: {0}")] + JsonGenerate(serde_json::Error), + + /// Error parsing YAML. + #[error("failed to parse YAML: {0}")] + YamlParse(serde_yaml::Error), + + /// Failed to open a file. + #[error("failed to open file {0}: {1}")] + FileOpen(PathBuf, std::io::Error), + + /// Failed to create a file. + #[error("failed to create file {0}: {1}")] + FileCreate(PathBuf, std::io::Error), + + /// Failed to write a file. + #[error("failed to write to file {0}: {1}")] + FileWrite(PathBuf, std::io::Error), + + /// Error from a chunk store. + #[error(transparent)] + ChunkStore(#[from] StoreError), +} + +/// Client for the Obnam server HTTP API. +pub struct BackupClient { + store: ChunkStore, + cipher: CipherEngine, +} + +impl BackupClient { + /// Create a new backup client. + pub fn new(config: &ClientConfig) -> Result<Self, ClientError> { + info!("creating backup client with config: {:#?}", config); + let pass = config.passwords()?; + Ok(Self { + store: ChunkStore::remote(config)?, + cipher: CipherEngine::new(&pass), + }) } - pub fn upload_chunk(&self, meta: ChunkMeta, chunk: DataChunk) -> anyhow::Result<ChunkId> { - let res = self - .client - .post(&self.chunks_url()) - .header("chunk-meta", meta.to_json()) - .body(chunk.data().to_vec()) - .send()?; - debug!("upload_chunk: res={:?}", res); - let res: HashMap<String, String> = res.json()?; - let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { - debug!("upload_chunk: id={}", chunk_id); - chunk_id.parse().unwrap() - } else { - return Err(ClientError::NoCreatedChunkId.into()); - }; - info!("uploaded_chunk {} meta {:?}", chunk_id, meta); - Ok(chunk_id) + /// Does the server have a chunk? + pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> { + let mut ids = self.store.find_by_label(meta).await?; + Ok(ids.pop()) } - pub fn upload_gen_chunk( - &self, - meta: ChunkMeta, - gen: GenerationChunk, - ) -> anyhow::Result<ChunkId> { - let res = self - .client - .post(&self.chunks_url()) - .header("chunk-meta", meta.to_json()) - .body(serde_json::to_string(&gen)?) - .send()?; - debug!("upload_chunk: res={:?}", res); - let res: HashMap<String, String> = res.json()?; - let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { - debug!("upload_chunk: id={}", chunk_id); - chunk_id.parse().unwrap() - } else { - return Err(ClientError::NoCreatedChunkId.into()); - }; - info!("uploaded_generation chunk {}", chunk_id); - Ok(chunk_id) + /// Upload a data chunk to the server. + pub async fn upload_chunk(&mut self, chunk: DataChunk) -> Result<ChunkId, ClientError> { + let enc = self.cipher.encrypt_chunk(&chunk)?; + let data = enc.ciphertext().to_vec(); + let id = self.store.put(data, chunk.meta()).await?; + Ok(id) } - pub fn upload_new_file_chunks(&self, chunker: Chunker) -> anyhow::Result<Vec<ChunkId>> { - let mut chunk_ids = vec![]; - for item in chunker { - let (meta, chunk) = item?; - if let Some(chunk_id) = self.has_chunk(&meta)? { - chunk_ids.push(chunk_id.clone()); - info!("reusing existing chunk {}", chunk_id); + /// Get current client trust chunk from repository, if there is one. + pub async fn get_client_trust(&self) -> Result<Option<ClientTrust>, ClientError> { + let ids = self.find_client_trusts().await?; + let mut latest: Option<ClientTrust> = None; + for id in ids { + let chunk = self.fetch_chunk(&id).await?; + let new = ClientTrust::from_data_chunk(&chunk)?; + if let Some(t) = &latest { + if new.timestamp() > t.timestamp() { + latest = Some(new); + } } else { - let chunk_id = self.upload_chunk(meta, chunk)?; - chunk_ids.push(chunk_id.clone()); - info!("created new chunk {}", chunk_id); + latest = Some(new); } } + Ok(latest) + } - Ok(chunk_ids) + async fn find_client_trusts(&self) -> Result<Vec<ChunkId>, ClientError> { + let label = Label::literal("client-trust"); + let meta = ChunkMeta::new(&label); + let ids = self.store.find_by_label(&meta).await?; + Ok(ids) } - pub fn list_generations(&self) -> anyhow::Result<GenerationList> { - let url = format!("{}?generation=true", &self.chunks_url()); - trace!("list_generations: url={:?}", url); - let req = self.client.get(&url).build()?; - let res = self.client.execute(req)?; - debug!("list_generations: status={}", res.status()); - let body = res.bytes()?; - debug!("list_generations: body={:?}", body); - let map: HashMap<String, ChunkMeta> = serde_yaml::from_slice(&body)?; - debug!("list_generations: map={:?}", map); - let finished = map + /// List backup generations known by the server. + pub fn list_generations(&self, trust: &ClientTrust) -> GenerationList { + let finished = trust + .backups() .iter() - .map(|(id, meta)| FinishedGeneration::new(id, meta.ended().map_or("", |s| s))) + .map(|id| FinishedGeneration::new(&format!("{}", id), "")) .collect(); - Ok(GenerationList::new(finished)) + GenerationList::new(finished) } - pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> anyhow::Result<DataChunk> { - info!("fetch chunk {}", chunk_id); - - let url = format!("{}/{}", &self.chunks_url(), chunk_id); - let req = self.client.get(&url).build()?; - let res = self.client.execute(req)?; - if res.status() != 200 { - let err = ClientError::ChunkNotFound(chunk_id.to_string()); - error!("fetching chunk {} failed: {}", chunk_id, err); - return Err(err.into()); - } - - let headers = res.headers(); - let meta = headers.get("chunk-meta"); - if meta.is_none() { - let err = ObnamError::NoChunkMeta(chunk_id.clone()); - error!("fetching chunk {} failed: {}", chunk_id, err); - return Err(err.into()); - } - let meta = meta.unwrap().to_str()?; - debug!("fetching chunk {}: meta={:?}", chunk_id, meta); - let meta: ChunkMeta = serde_json::from_str(meta)?; - debug!("fetching chunk {}: meta={:?}", chunk_id, meta); - - let body = res.bytes()?; - let body = body.to_vec(); - let actual = sha256(&body); - if actual != meta.sha256() { - let err = - ObnamError::WrongChecksum(chunk_id.clone(), actual, meta.sha256().to_string()); - error!("fetching chunk {} failed: {}", chunk_id, err); - return Err(err.into()); - } - - let chunk: DataChunk = DataChunk::new(body); + /// Fetch a data chunk from the server, given the chunk identifier. + pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> { + let (body, meta) = self.store.get(chunk_id).await?; + let meta_bytes = meta.to_json_vec(); + let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?; Ok(chunk) } - fn fetch_generation_chunk(&self, gen_id: &str) -> anyhow::Result<GenerationChunk> { - let chunk_id = ChunkId::from_str(gen_id); - let chunk = self.fetch_chunk(&chunk_id)?; + async fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result<GenerationChunk, ClientError> { + let chunk = self.fetch_chunk(gen_id.as_chunk_id()).await?; let gen = GenerationChunk::from_data_chunk(&chunk)?; Ok(gen) } - pub fn fetch_generation(&self, gen_id: &str, dbname: &Path) -> anyhow::Result<LocalGeneration> { - let gen = self.fetch_generation_chunk(gen_id)?; + /// Fetch a backup generation's metadata, given it's identifier. + pub async fn fetch_generation( + &self, + gen_id: &GenId, + dbname: &Path, + ) -> Result<LocalGeneration, ClientError> { + let gen = self.fetch_generation_chunk(gen_id).await?; // Fetch the SQLite file, storing it in the named file. - let mut dbfile = File::create(&dbname)?; + let mut dbfile = File::create(dbname) + .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?; for id in gen.chunk_ids() { - let chunk = self.fetch_chunk(id)?; - dbfile.write_all(chunk.data())?; + let chunk = self.fetch_chunk(id).await?; + dbfile + .write_all(chunk.data()) + .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?; } info!("downloaded generation to {}", dbname.display()); @@ -273,8 +209,3 @@ impl BackupClient { Ok(gen) } } - -fn current_timestamp() -> String { - let now: DateTime<Local> = Local::now(); - format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z")) -} diff --git a/src/cmd/backup.rs b/src/cmd/backup.rs index da7298f..70e9eac 100644 --- a/src/cmd/backup.rs +++ b/src/cmd/backup.rs @@ -1,65 +1,137 @@ -use crate::backup_run::BackupRun; -use crate::client::ClientConfig; -use crate::fsiter::FsIterator; -use crate::generation::NascentGeneration; +//! The `backup` subcommand. + +use crate::backup_run::{current_timestamp, BackupRun}; +use crate::chunk::ClientTrust; +use crate::client::BackupClient; +use crate::config::ClientConfig; +use crate::dbgen::{schema_version, FileId, DEFAULT_SCHEMA_MAJOR}; +use crate::error::ObnamError; +use crate::generation::GenId; +use crate::performance::{Clock, Performance}; +use crate::schema::VersionComponent; + +use clap::Parser; use log::info; use std::time::SystemTime; -use tempfile::NamedTempFile; - -pub fn backup(config: &ClientConfig, buffer_size: usize) -> anyhow::Result<()> { - let runtime = SystemTime::now(); - - let run = BackupRun::new(config, buffer_size)?; - - // Create a named temporary file. We don't meed the open file - // handle, so we discard that. - let oldname = { - let temp = NamedTempFile::new()?; - let (_, dbname) = temp.keep()?; - dbname - }; - - // Create a named temporary file. We don't meed the open file - // handle, so we discard that. - let newname = { - let temp = NamedTempFile::new()?; - let (_, dbname) = temp.keep()?; - dbname - }; - - let genlist = run.client().list_generations()?; - let file_count = { - let iter = FsIterator::new(&config.root); - let mut new = NascentGeneration::create(&newname)?; - - match genlist.resolve("latest") { - None => { - info!("fresh backup without a previous generation"); - new.insert_iter(iter.map(|entry| run.backup_file_initially(entry)))?; +use tempfile::tempdir; +use tokio::runtime::Runtime; + +/// Make a backup. +#[derive(Debug, Parser)] +pub struct Backup { + /// Force a full backup, instead of an incremental one. + #[clap(long)] + full: bool, + + /// Backup schema major version to use. + #[clap(long)] + backup_version: Option<VersionComponent>, +} + +impl Backup { + /// Run the command. + pub fn run(&self, config: &ClientConfig, perf: &mut Performance) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config, perf)) + } + + async fn run_async( + &self, + config: &ClientConfig, + perf: &mut Performance, + ) -> Result<(), ObnamError> { + let runtime = SystemTime::now(); + + let major = self.backup_version.unwrap_or(DEFAULT_SCHEMA_MAJOR); + let schema = schema_version(major)?; + + let mut client = BackupClient::new(config)?; + let trust = client + .get_client_trust() + .await? + .or_else(|| Some(ClientTrust::new("FIXME", None, current_timestamp(), vec![]))) + .unwrap(); + let genlist = client.list_generations(&trust); + + let temp = tempdir()?; + let oldtemp = temp.path().join("old.db"); + let newtemp = temp.path().join("new.db"); + + let old_id = if self.full { + None + } else { + match genlist.resolve("latest") { + Err(_) => None, + Ok(old_id) => Some(old_id), } - Some(old) => { - info!("incremental backup based on {}", old); - let old = run.client().fetch_generation(&old, &oldname)?; - run.progress() - .files_in_previous_generation(old.file_count()? as u64); - new.insert_iter(iter.map(|entry| run.backup_file_incrementally(entry, &old)))?; + }; + + let (is_incremental, outcome) = if let Some(old_id) = old_id { + info!("incremental backup based on {}", old_id); + let mut run = BackupRun::incremental(config, &mut client)?; + let old = run.start(Some(&old_id), &oldtemp, perf).await?; + ( + true, + run.backup_roots(config, &old, &newtemp, schema, perf) + .await?, + ) + } else { + info!("fresh backup without a previous generation"); + let mut run = BackupRun::initial(config, &mut client)?; + let old = run.start(None, &oldtemp, perf).await?; + ( + false, + run.backup_roots(config, &old, &newtemp, schema, perf) + .await?, + ) + }; + + perf.start(Clock::GenerationUpload); + let mut trust = trust; + trust.append_backup(outcome.gen_id.as_chunk_id()); + trust.finalize(current_timestamp()); + let trust = trust.to_data_chunk()?; + let trust_id = client.upload_chunk(trust).await?; + perf.stop(Clock::GenerationUpload); + info!("uploaded new client-trust {}", trust_id); + + for w in outcome.warnings.iter() { + println!("warning: {}", w); + } + + if is_incremental && !outcome.new_cachedir_tags.is_empty() { + println!("New CACHEDIR.TAG files since the last backup:"); + for t in &outcome.new_cachedir_tags { + println!("- {:?}", t); } + println!("You can configure Obnam to ignore all such files by setting `exclude_cache_tag_directories` to `false`."); } - run.progress().finish(); - new.file_count() - }; - // Upload the SQLite file, i.e., the named temporary file, which - // still exists, since we persisted it above. - let gen_id = run.client().upload_generation(&newname, buffer_size)?; + report_stats( + &runtime, + outcome.files_count, + &outcome.gen_id, + outcome.warnings.len(), + )?; + + if is_incremental && !outcome.new_cachedir_tags.is_empty() { + Err(ObnamError::NewCachedirTagsFound) + } else { + Ok(()) + } + } +} + +fn report_stats( + runtime: &SystemTime, + file_count: FileId, + gen_id: &GenId, + num_warnings: usize, +) -> Result<(), ObnamError> { println!("status: OK"); + println!("warnings: {}", num_warnings); println!("duration: {}", runtime.elapsed()?.as_secs()); println!("file-count: {}", file_count); println!("generation-id: {}", gen_id); - - // Delete the temporary file.q - std::fs::remove_file(&newname)?; - std::fs::remove_file(&oldname)?; - Ok(()) } diff --git a/src/cmd/chunk.rs b/src/cmd/chunk.rs new file mode 100644 index 0000000..293de20 --- /dev/null +++ b/src/cmd/chunk.rs @@ -0,0 +1,70 @@ +//! The `encrypt-chunk` and `decrypt-chunk` subcommands. + +use crate::chunk::DataChunk; +use crate::chunkmeta::ChunkMeta; +use crate::cipher::CipherEngine; +use crate::config::ClientConfig; +use crate::error::ObnamError; +use clap::Parser; +use std::path::PathBuf; + +/// Encrypt a chunk. +#[derive(Debug, Parser)] +pub struct EncryptChunk { + /// The name of the file containing the cleartext chunk. + filename: PathBuf, + + /// Name of file where to write the encrypted chunk. + output: PathBuf, + + /// Chunk metadata as JSON. + json: String, +} + +impl EncryptChunk { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let pass = config.passwords()?; + let cipher = CipherEngine::new(&pass); + + let meta = ChunkMeta::from_json(&self.json)?; + + let cleartext = std::fs::read(&self.filename)?; + let chunk = DataChunk::new(cleartext, meta); + let encrypted = cipher.encrypt_chunk(&chunk)?; + + std::fs::write(&self.output, encrypted.ciphertext())?; + + Ok(()) + } +} + +/// Decrypt a chunk. +#[derive(Debug, Parser)] +pub struct DecryptChunk { + /// Name of file containing encrypted chunk. + filename: PathBuf, + + /// Name of file where to write the cleartext chunk. + output: PathBuf, + + /// Chunk metadata as JSON. + json: String, +} + +impl DecryptChunk { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let pass = config.passwords()?; + let cipher = CipherEngine::new(&pass); + + let meta = ChunkMeta::from_json(&self.json)?; + + let encrypted = std::fs::read(&self.filename)?; + let chunk = cipher.decrypt_chunk(&encrypted, &meta.to_json_vec())?; + + std::fs::write(&self.output, chunk.data())?; + + Ok(()) + } +} diff --git a/src/cmd/chunkify.rs b/src/cmd/chunkify.rs new file mode 100644 index 0000000..91cb0be --- /dev/null +++ b/src/cmd/chunkify.rs @@ -0,0 +1,110 @@ +//! The `chunkify` subcommand. + +use crate::config::ClientConfig; +use crate::engine::Engine; +use crate::error::ObnamError; +use crate::workqueue::WorkQueue; +use clap::Parser; +use serde::Serialize; +use sha2::{Digest, Sha256}; +use std::path::PathBuf; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, BufReader}; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; + +// Size of queue with unprocessed chunks, and also queue of computed +// checksums. +const Q: usize = 8; + +/// Split files into chunks and show their metadata. +#[derive(Debug, Parser)] +pub struct Chunkify { + /// Names of files to split into chunks. + filenames: Vec<PathBuf>, +} + +impl Chunkify { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let mut q = WorkQueue::new(Q); + for filename in self.filenames.iter() { + tokio::spawn(split_file( + filename.to_path_buf(), + config.chunk_size, + q.push(), + )); + } + q.close(); + + let mut summer = Engine::new(q, just_hash); + + let mut checksums = vec![]; + while let Some(sum) = summer.next().await { + checksums.push(sum); + } + + println!("{}", serde_json::to_string_pretty(&checksums)?); + + Ok(()) + } +} + +#[derive(Debug, Clone)] +struct Chunk { + filename: PathBuf, + offset: u64, + data: Vec<u8>, +} + +#[derive(Debug, Clone, Serialize)] +struct Checksum { + filename: PathBuf, + offset: u64, + pub len: u64, + checksum: String, +} + +async fn split_file(filename: PathBuf, chunk_size: usize, tx: mpsc::Sender<Chunk>) { + // println!("split_file {}", filename.display()); + let mut file = BufReader::new(File::open(&*filename).await.unwrap()); + + let mut offset = 0; + loop { + let mut data = vec![0; chunk_size]; + let n = file.read(&mut data).await.unwrap(); + if n == 0 { + break; + } + let data: Vec<u8> = data[..n].to_vec(); + + let chunk = Chunk { + filename: filename.clone(), + offset, + data, + }; + tx.send(chunk).await.unwrap(); + // println!("split_file sent chunk at offset {}", offset); + + offset += n as u64; + } + // println!("split_file EOF at {}", offset); +} + +fn just_hash(chunk: Chunk) -> Checksum { + let mut hasher = Sha256::new(); + hasher.update(&chunk.data); + let hash = hasher.finalize(); + let hash = format!("{:x}", hash); + Checksum { + filename: chunk.filename, + offset: chunk.offset, + len: chunk.data.len() as u64, + checksum: hash, + } +} diff --git a/src/cmd/gen_info.rs b/src/cmd/gen_info.rs new file mode 100644 index 0000000..901a0ae --- /dev/null +++ b/src/cmd/gen_info.rs @@ -0,0 +1,47 @@ +//! The `gen-info` subcommand. + +use crate::chunk::ClientTrust; +use crate::client::BackupClient; +use crate::config::ClientConfig; +use crate::error::ObnamError; +use clap::Parser; +use log::info; +use tempfile::NamedTempFile; +use tokio::runtime::Runtime; + +/// Show metadata for a generation. +#[derive(Debug, Parser)] +pub struct GenInfo { + /// Reference of the generation. + gen_ref: String, +} + +impl GenInfo { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let temp = NamedTempFile::new()?; + + let client = BackupClient::new(config)?; + + let trust = client + .get_client_trust() + .await? + .or_else(|| Some(ClientTrust::new("FIXME", None, "".to_string(), vec![]))) + .unwrap(); + + let genlist = client.list_generations(&trust); + let gen_id = genlist.resolve(&self.gen_ref)?; + info!("generation id is {}", gen_id.as_chunk_id()); + + let gen = client.fetch_generation(&gen_id, temp.path()).await?; + let meta = gen.meta()?; + println!("{}", serde_json::to_string_pretty(&meta)?); + + Ok(()) + } +} diff --git a/src/cmd/get_chunk.rs b/src/cmd/get_chunk.rs index bf653ff..1561492 100644 --- a/src/cmd/get_chunk.rs +++ b/src/cmd/get_chunk.rs @@ -1,15 +1,34 @@ +//! The `get-chunk` subcommand. + use crate::chunkid::ChunkId; use crate::client::BackupClient; -use crate::client::ClientConfig; +use crate::config::ClientConfig; +use crate::error::ObnamError; +use clap::Parser; use std::io::{stdout, Write}; +use tokio::runtime::Runtime; + +/// Fetch a chunk from the server. +#[derive(Debug, Parser)] +pub struct GetChunk { + /// Identifier of chunk to fetch. + chunk_id: String, +} -pub fn get_chunk(config: &ClientConfig, chunk_id: &str) -> anyhow::Result<()> { - let client = BackupClient::new(&config.server_url)?; - let chunk_id: ChunkId = chunk_id.parse().unwrap(); - let chunk = client.fetch_chunk(&chunk_id)?; +impl GetChunk { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } - let stdout = stdout(); - let mut handle = stdout.lock(); - handle.write_all(chunk.data())?; - Ok(()) + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let client = BackupClient::new(config)?; + let chunk_id: ChunkId = self.chunk_id.parse().unwrap(); + let chunk = client.fetch_chunk(&chunk_id).await?; + let stdout = stdout(); + let mut handle = stdout.lock(); + handle.write_all(chunk.data())?; + Ok(()) + } } diff --git a/src/cmd/init.rs b/src/cmd/init.rs new file mode 100644 index 0000000..5950fbb --- /dev/null +++ b/src/cmd/init.rs @@ -0,0 +1,33 @@ +//! The `init` subcommand. + +use crate::config::ClientConfig; +use crate::error::ObnamError; +use crate::passwords::{passwords_filename, Passwords}; +use clap::Parser; + +const PROMPT: &str = "Obnam passphrase: "; + +/// Initialize client by setting passwords. +#[derive(Debug, Parser)] +pub struct Init { + /// Only for testing. + #[clap(long)] + insecure_passphrase: Option<String>, +} + +impl Init { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let passphrase = match &self.insecure_passphrase { + Some(x) => x.to_string(), + None => rpassword::prompt_password(PROMPT).unwrap(), + }; + + let passwords = Passwords::new(&passphrase); + let filename = passwords_filename(&config.filename); + passwords + .save(&filename) + .map_err(|err| ObnamError::PasswordSave(filename, err))?; + Ok(()) + } +} diff --git a/src/cmd/inspect.rs b/src/cmd/inspect.rs new file mode 100644 index 0000000..3b41075 --- /dev/null +++ b/src/cmd/inspect.rs @@ -0,0 +1,46 @@ +//! The `inspect` subcommand. + +use crate::backup_run::current_timestamp; +use crate::chunk::ClientTrust; +use crate::client::BackupClient; +use crate::config::ClientConfig; +use crate::error::ObnamError; + +use clap::Parser; +use log::info; +use tempfile::NamedTempFile; +use tokio::runtime::Runtime; + +/// Make a backup. +#[derive(Debug, Parser)] +pub struct Inspect { + /// Reference to generation to inspect. + gen_id: String, +} + +impl Inspect { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let temp = NamedTempFile::new()?; + let client = BackupClient::new(config)?; + let trust = client + .get_client_trust() + .await? + .or_else(|| Some(ClientTrust::new("FIXME", None, current_timestamp(), vec![]))) + .unwrap(); + let genlist = client.list_generations(&trust); + let gen_id = genlist.resolve(&self.gen_id)?; + info!("generation id is {}", gen_id.as_chunk_id()); + + let gen = client.fetch_generation(&gen_id, temp.path()).await?; + let meta = gen.meta()?; + println!("schema_version: {}", meta.schema_version()); + + Ok(()) + } +} diff --git a/src/cmd/list.rs b/src/cmd/list.rs index 8766e34..8bc6978 100644 --- a/src/cmd/list.rs +++ b/src/cmd/list.rs @@ -1,12 +1,36 @@ -use crate::client::{BackupClient, ClientConfig}; +//! The `list` subcommand. -pub fn list(config: &ClientConfig) -> anyhow::Result<()> { - let client = BackupClient::new(&config.server_url)?; +use crate::chunk::ClientTrust; +use crate::client::BackupClient; +use crate::config::ClientConfig; +use crate::error::ObnamError; +use clap::Parser; +use tokio::runtime::Runtime; - let generations = client.list_generations()?; - for finished in generations.iter() { - println!("{} {}", finished.id(), finished.ended()); +/// List generations on the server. +#[derive(Debug, Parser)] +pub struct List {} + +impl List { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) } - Ok(()) + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let client = BackupClient::new(config)?; + let trust = client + .get_client_trust() + .await? + .or_else(|| Some(ClientTrust::new("FIXME", None, "".to_string(), vec![]))) + .unwrap(); + + let generations = client.list_generations(&trust); + for finished in generations.iter() { + println!("{} {}", finished.id(), finished.ended()); + } + + Ok(()) + } } diff --git a/src/cmd/list_backup_versions.rs b/src/cmd/list_backup_versions.rs new file mode 100644 index 0000000..c78ccfc --- /dev/null +++ b/src/cmd/list_backup_versions.rs @@ -0,0 +1,31 @@ +//! The `backup` subcommand. + +use crate::config::ClientConfig; +use crate::dbgen::{schema_version, DEFAULT_SCHEMA_MAJOR, SCHEMA_MAJORS}; +use crate::error::ObnamError; + +use clap::Parser; + +/// List supported backup schema versions. +#[derive(Debug, Parser)] +pub struct ListSchemaVersions { + /// List only the default version. + #[clap(long)] + default_only: bool, +} + +impl ListSchemaVersions { + /// Run the command. + pub fn run(&self, _config: &ClientConfig) -> Result<(), ObnamError> { + if self.default_only { + let schema = schema_version(DEFAULT_SCHEMA_MAJOR)?; + println!("{}", schema); + } else { + for major in SCHEMA_MAJORS { + let schema = schema_version(*major)?; + println!("{}", schema); + } + } + Ok(()) + } +} diff --git a/src/cmd/list_files.rs b/src/cmd/list_files.rs index a69c3df..e8276cd 100644 --- a/src/cmd/list_files.rs +++ b/src/cmd/list_files.rs @@ -1,36 +1,51 @@ +//! The `list-files` subcommand. + use crate::backup_reason::Reason; +use crate::chunk::ClientTrust; use crate::client::BackupClient; -use crate::client::ClientConfig; +use crate::config::ClientConfig; use crate::error::ObnamError; use crate::fsentry::{FilesystemEntry, FilesystemKind}; +use clap::Parser; use tempfile::NamedTempFile; +use tokio::runtime::Runtime; -pub fn list_files(config: &ClientConfig, gen_ref: &str) -> anyhow::Result<()> { - // Create a named temporary file. We don't meed the open file - // handle, so we discard that. - let dbname = { - let temp = NamedTempFile::new()?; - let (_, dbname) = temp.keep()?; - dbname - }; +/// List files in a backup. +#[derive(Debug, Parser)] +pub struct ListFiles { + /// Reference to backup to list files in. + #[clap(default_value = "latest")] + gen_id: String, +} + +impl ListFiles { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } - let client = BackupClient::new(&config.server_url)?; + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let temp = NamedTempFile::new()?; - let genlist = client.list_generations()?; - let gen_id: String = match genlist.resolve(gen_ref) { - None => return Err(ObnamError::UnknownGeneration(gen_ref.to_string()).into()), - Some(id) => id, - }; + let client = BackupClient::new(config)?; + let trust = client + .get_client_trust() + .await? + .or_else(|| Some(ClientTrust::new("FIXME", None, "".to_string(), vec![]))) + .unwrap(); - let gen = client.fetch_generation(&gen_id, &dbname)?; - for file in gen.files()? { - println!("{}", format_entry(&file.entry(), file.reason())); - } + let genlist = client.list_generations(&trust); + let gen_id = genlist.resolve(&self.gen_id)?; - // Delete the temporary file. - std::fs::remove_file(&dbname)?; + let gen = client.fetch_generation(&gen_id, temp.path()).await?; + for file in gen.files()?.iter()? { + let (_, entry, reason, _) = file?; + println!("{}", format_entry(&entry, reason)); + } - Ok(()) + Ok(()) + } } fn format_entry(e: &FilesystemEntry, reason: Reason) -> String { @@ -38,6 +53,8 @@ fn format_entry(e: &FilesystemEntry, reason: Reason) -> String { FilesystemKind::Regular => "-", FilesystemKind::Directory => "d", FilesystemKind::Symlink => "l", + FilesystemKind::Socket => "s", + FilesystemKind::Fifo => "p", }; format!("{} {} ({})", kind, e.pathbuf().display(), reason) } diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index 8f08668..af7457b 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -1,17 +1,16 @@ -mod backup; -pub use backup::backup; - -mod list; -pub use list::list; - -mod list_files; -pub use list_files::list_files; - -pub mod restore; -pub use restore::restore; +//! Subcommand implementations. +pub mod backup; +pub mod chunk; +pub mod chunkify; +pub mod gen_info; pub mod get_chunk; -pub use get_chunk::get_chunk; - +pub mod init; +pub mod inspect; +pub mod list; +pub mod list_backup_versions; +pub mod list_files; +pub mod resolve; +pub mod restore; +pub mod show_config; pub mod show_gen; -pub use show_gen::show_generation; diff --git a/src/cmd/resolve.rs b/src/cmd/resolve.rs new file mode 100644 index 0000000..a7774d7 --- /dev/null +++ b/src/cmd/resolve.rs @@ -0,0 +1,44 @@ +//! The `resolve` subcommand. + +use crate::chunk::ClientTrust; +use crate::client::BackupClient; +use crate::config::ClientConfig; +use crate::error::ObnamError; +use clap::Parser; +use tokio::runtime::Runtime; + +/// Resolve a generation reference into a generation id. +#[derive(Debug, Parser)] +pub struct Resolve { + /// The generation reference. + generation: String, +} + +impl Resolve { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let client = BackupClient::new(config)?; + let trust = client + .get_client_trust() + .await? + .or_else(|| Some(ClientTrust::new("FIXME", None, "".to_string(), vec![]))) + .unwrap(); + let generations = client.list_generations(&trust); + + match generations.resolve(&self.generation) { + Err(err) => { + return Err(err.into()); + } + Ok(gen_id) => { + println!("{}", gen_id.as_chunk_id()); + } + }; + + Ok(()) + } +} diff --git a/src/cmd/restore.rs b/src/cmd/restore.rs index d783a70..58caf61 100644 --- a/src/cmd/restore.rs +++ b/src/cmd/restore.rs @@ -1,101 +1,165 @@ -use crate::client::BackupClient; -use crate::client::ClientConfig; +//! The `restore` subcommand. + +use crate::backup_reason::Reason; +use crate::chunk::ClientTrust; +use crate::client::{BackupClient, ClientError}; +use crate::config::ClientConfig; +use crate::db::DatabaseError; +use crate::dbgen::FileId; use crate::error::ObnamError; use crate::fsentry::{FilesystemEntry, FilesystemKind}; -use crate::generation::LocalGeneration; +use crate::generation::{LocalGeneration, LocalGenerationError}; +use clap::Parser; use indicatif::{ProgressBar, ProgressStyle}; -use libc::{fchmod, futimens, timespec}; +use libc::{chmod, mkfifo, timespec, utimensat, AT_FDCWD, AT_SYMLINK_NOFOLLOW}; use log::{debug, error, info}; -use std::fs::File; +use std::ffi::CString; use std::io::prelude::*; use std::io::Error; +use std::os::unix::ffi::OsStrExt; use std::os::unix::fs::symlink; -use std::os::unix::io::AsRawFd; +use std::os::unix::net::UnixListener; +use std::path::StripPrefixError; use std::path::{Path, PathBuf}; -use structopt::StructOpt; use tempfile::NamedTempFile; +use tokio::runtime::Runtime; + +/// Restore a backup. +#[derive(Debug, Parser)] +pub struct Restore { + /// Reference to generation to restore. + gen_id: String, + + /// Path to directory where restored files are written. + to: PathBuf, +} + +impl Restore { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } -pub fn restore(config: &ClientConfig, gen_ref: &str, to: &Path) -> anyhow::Result<()> { - // Create a named temporary file. We don't meed the open file - // handle, so we discard that. - let dbname = { + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { let temp = NamedTempFile::new()?; - let (_, dbname) = temp.keep()?; - dbname - }; - let client = BackupClient::new(&config.server_url)?; + let client = BackupClient::new(config)?; + let trust = client + .get_client_trust() + .await? + .or_else(|| Some(ClientTrust::new("FIXME", None, "".to_string(), vec![]))) + .unwrap(); - let genlist = client.list_generations()?; - let gen_id: String = match genlist.resolve(gen_ref) { - None => return Err(ObnamError::UnknownGeneration(gen_ref.to_string()).into()), - Some(id) => id, - }; - info!("generation id is {}", gen_id); + let genlist = client.list_generations(&trust); + let gen_id = genlist.resolve(&self.gen_id)?; + info!("generation id is {}", gen_id.as_chunk_id()); - let gen = client.fetch_generation(&gen_id, &dbname)?; - info!("restoring {} files", gen.file_count()?); - let progress = create_progress_bar(gen.file_count()?, true); - for file in gen.files()? { - restore_generation(&client, &gen, file.fileno(), file.entry(), &to, &progress)?; - } - for file in gen.files()? { - if file.entry().is_dir() { - restore_directory_metadata(file.entry(), &to)?; + let gen = client.fetch_generation(&gen_id, temp.path()).await?; + info!("restoring {} files", gen.file_count()?); + let progress = create_progress_bar(gen.file_count()?, true); + for file in gen.files()?.iter()? { + let (fileno, entry, reason, _) = file?; + match reason { + Reason::FileError => (), + _ => restore_generation(&client, &gen, fileno, &entry, &self.to, &progress).await?, + } + } + for file in gen.files()?.iter()? { + let (_, entry, _, _) = file?; + if entry.is_dir() { + restore_directory_metadata(&entry, &self.to)?; + } } + progress.finish(); + + Ok(()) } - progress.finish(); +} - // Delete the temporary file. - std::fs::remove_file(&dbname)?; +/// Possible errors from restoring. +#[derive(Debug, thiserror::Error)] +pub enum RestoreError { + /// An error using a Database. + #[error(transparent)] + Database(#[from] DatabaseError), - Ok(()) -} + /// Failed to create a name pipe. + #[error("Could not create named pipe (FIFO) {0}")] + NamedPipeCreationError(PathBuf), -#[derive(Debug, StructOpt)] -#[structopt(name = "obnam-backup", about = "Simplistic backup client")] -struct Opt { - #[structopt(parse(from_os_str))] - config: PathBuf, + /// Error from HTTP client. + #[error(transparent)] + ClientError(#[from] ClientError), - #[structopt()] - gen_id: String, + /// Error from local generation. + #[error(transparent)] + LocalGenerationError(#[from] LocalGenerationError), - #[structopt(parse(from_os_str))] - dbname: PathBuf, + /// Error removing a prefix. + #[error(transparent)] + StripPrefixError(#[from] StripPrefixError), - #[structopt(parse(from_os_str))] - to: PathBuf, + /// Error creating a directory. + #[error("failed to create directory {0}: {1}")] + CreateDirs(PathBuf, std::io::Error), + + /// Error creating a file. + #[error("failed to create file {0}: {1}")] + CreateFile(PathBuf, std::io::Error), + + /// Error writing a file. + #[error("failed to write file {0}: {1}")] + WriteFile(PathBuf, std::io::Error), + + /// Error creating a symbolic link. + #[error("failed to create symbolic link {0}: {1}")] + Symlink(PathBuf, std::io::Error), + + /// Error creating a UNIX domain socket. + #[error("failed to create UNIX domain socket {0}: {1}")] + UnixBind(PathBuf, std::io::Error), + + /// Error setting permissions. + #[error("failed to set permissions for {0}: {1}")] + Chmod(PathBuf, std::io::Error), + + /// Error settting timestamp. + #[error("failed to set timestamp for {0}: {1}")] + SetTimestamp(PathBuf, std::io::Error), } -fn restore_generation( +async fn restore_generation( client: &BackupClient, gen: &LocalGeneration, - fileid: i64, + fileid: FileId, entry: &FilesystemEntry, to: &Path, progress: &ProgressBar, -) -> anyhow::Result<()> { +) -> Result<(), RestoreError> { info!("restoring {:?}", entry); - progress.set_message(&format!("{}", entry.pathbuf().display())); + progress.set_message(format!("{}", entry.pathbuf().display())); progress.inc(1); let to = restored_path(entry, to)?; match entry.kind() { - FilesystemKind::Regular => restore_regular(client, &gen, &to, fileid, &entry)?, + FilesystemKind::Regular => restore_regular(client, gen, &to, fileid, entry).await?, FilesystemKind::Directory => restore_directory(&to)?, - FilesystemKind::Symlink => restore_symlink(&to, &entry)?, + FilesystemKind::Symlink => restore_symlink(&to, entry)?, + FilesystemKind::Socket => restore_socket(&to, entry)?, + FilesystemKind::Fifo => restore_fifo(&to, entry)?, } Ok(()) } -fn restore_directory(path: &Path) -> anyhow::Result<()> { +fn restore_directory(path: &Path) -> Result<(), RestoreError> { debug!("restoring directory {}", path.display()); - std::fs::create_dir_all(path)?; + std::fs::create_dir_all(path) + .map_err(|err| RestoreError::CreateDirs(path.to_path_buf(), err))?; Ok(()) } -fn restore_directory_metadata(entry: &FilesystemEntry, to: &Path) -> anyhow::Result<()> { +fn restore_directory_metadata(entry: &FilesystemEntry, to: &Path) -> Result<(), RestoreError> { let to = restored_path(entry, to)?; match entry.kind() { FilesystemKind::Directory => restore_metadata(&to, entry)?, @@ -107,7 +171,7 @@ fn restore_directory_metadata(entry: &FilesystemEntry, to: &Path) -> anyhow::Res Ok(()) } -fn restored_path(entry: &FilesystemEntry, to: &Path) -> anyhow::Result<PathBuf> { +fn restored_path(entry: &FilesystemEntry, to: &Path) -> Result<PathBuf, RestoreError> { let path = &entry.pathbuf(); let path = if path.is_absolute() { path.strip_prefix("/")? @@ -117,22 +181,26 @@ fn restored_path(entry: &FilesystemEntry, to: &Path) -> anyhow::Result<PathBuf> Ok(to.join(path)) } -fn restore_regular( +async fn restore_regular( client: &BackupClient, gen: &LocalGeneration, path: &Path, - fileid: i64, + fileid: FileId, entry: &FilesystemEntry, -) -> anyhow::Result<()> { +) -> Result<(), RestoreError> { debug!("restoring regular {}", path.display()); let parent = path.parent().unwrap(); debug!(" mkdir {}", parent.display()); - std::fs::create_dir_all(parent)?; + std::fs::create_dir_all(parent) + .map_err(|err| RestoreError::CreateDirs(parent.to_path_buf(), err))?; { - let mut file = std::fs::File::create(path)?; - for chunkid in gen.chunkids(fileid)? { - let chunk = client.fetch_chunk(&chunkid)?; - file.write_all(chunk.data())?; + let mut file = std::fs::File::create(path) + .map_err(|err| RestoreError::CreateFile(path.to_path_buf(), err))?; + for chunkid in gen.chunkids(fileid)?.iter()? { + let chunkid = chunkid?; + let chunk = client.fetch_chunk(&chunkid).await?; + file.write_all(chunk.data()) + .map_err(|err| RestoreError::WriteFile(path.to_path_buf(), err))?; } restore_metadata(path, entry)?; } @@ -140,24 +208,44 @@ fn restore_regular( Ok(()) } -fn restore_symlink(path: &Path, entry: &FilesystemEntry) -> anyhow::Result<()> { +fn restore_symlink(path: &Path, entry: &FilesystemEntry) -> Result<(), RestoreError> { debug!("restoring symlink {}", path.display()); let parent = path.parent().unwrap(); debug!(" mkdir {}", parent.display()); if !parent.exists() { - std::fs::create_dir_all(parent)?; - { - symlink(path, entry.symlink_target().unwrap())?; + std::fs::create_dir_all(parent) + .map_err(|err| RestoreError::CreateDirs(parent.to_path_buf(), err))?; + } + symlink(entry.symlink_target().unwrap(), path) + .map_err(|err| RestoreError::Symlink(path.to_path_buf(), err))?; + restore_metadata(path, entry)?; + debug!("restored symlink {}", path.display()); + Ok(()) +} + +fn restore_socket(path: &Path, entry: &FilesystemEntry) -> Result<(), RestoreError> { + debug!("creating Unix domain socket {:?}", path); + UnixListener::bind(path).map_err(|err| RestoreError::UnixBind(path.to_path_buf(), err))?; + restore_metadata(path, entry)?; + Ok(()) +} + +fn restore_fifo(path: &Path, entry: &FilesystemEntry) -> Result<(), RestoreError> { + debug!("creating fifo {:?}", path); + let filename = path_to_cstring(path); + match unsafe { mkfifo(filename.as_ptr(), 0) } { + -1 => { + return Err(RestoreError::NamedPipeCreationError(path.to_path_buf())); } + _ => restore_metadata(path, entry)?, } - debug!("restored regular {}", path.display()); Ok(()) } -fn restore_metadata(path: &Path, entry: &FilesystemEntry) -> anyhow::Result<()> { +fn restore_metadata(path: &Path, entry: &FilesystemEntry) -> Result<(), RestoreError> { debug!("restoring metadata for {}", entry.pathbuf().display()); - let handle = File::open(path)?; + debug!("restoring metadata for {:?}", path); let atime = timespec { tv_sec: entry.atime(), @@ -170,41 +258,58 @@ fn restore_metadata(path: &Path, entry: &FilesystemEntry) -> anyhow::Result<()> let times = [atime, mtime]; let times: *const timespec = ×[0]; + let pathbuf = path.to_path_buf(); + let path = path_to_cstring(path); + // We have to use unsafe here to be able call the libc functions // below. unsafe { - let fd = handle.as_raw_fd(); // FIXME: needs to NOT follow symlinks - - debug!("fchmod"); - if fchmod(fd, entry.mode()) == -1 { - let error = Error::last_os_error(); - error!("fchmod failed on {}", path.display()); - return Err(error.into()); + if entry.kind() != FilesystemKind::Symlink { + debug!("chmod {:?}", path); + if chmod(path.as_ptr(), entry.mode() as libc::mode_t) == -1 { + let error = Error::last_os_error(); + error!("chmod failed on {:?}", path); + return Err(RestoreError::Chmod(pathbuf, error)); + } + } else { + debug!( + "skipping chmod of a symlink because it'll attempt to change the pointed-at file" + ); } - debug!("futimens"); - if futimens(fd, times) == -1 { + debug!("utimens {:?}", path); + if utimensat(AT_FDCWD, path.as_ptr(), times, AT_SYMLINK_NOFOLLOW) == -1 { let error = Error::last_os_error(); - error!("futimens failed on {}", path.display()); - return Err(error.into()); + error!("utimensat failed on {:?}", path); + return Err(RestoreError::SetTimestamp(pathbuf, error)); } } Ok(()) } -fn create_progress_bar(file_count: i64, verbose: bool) -> ProgressBar { +fn path_to_cstring(path: &Path) -> CString { + let path = path.as_os_str(); + let path = path.as_bytes(); + CString::new(path).unwrap() +} + +fn create_progress_bar(file_count: FileId, verbose: bool) -> ProgressBar { let progress = if verbose { ProgressBar::new(file_count as u64) } else { ProgressBar::hidden() }; - let parts = vec![ + let parts = [ "{wide_bar}", "elapsed: {elapsed}", "files: {pos}/{len}", "current: {wide_msg}", "{spinner}", ]; - progress.set_style(ProgressStyle::default_bar().template(&parts.join("\n"))); + progress.set_style( + ProgressStyle::default_bar() + .template(&parts.join("\n")) + .expect("create indicatif ProgressStyle value"), + ); progress } diff --git a/src/cmd/show_config.rs b/src/cmd/show_config.rs new file mode 100644 index 0000000..8e0ce30 --- /dev/null +++ b/src/cmd/show_config.rs @@ -0,0 +1,17 @@ +//! The `show-config` subcommand. + +use crate::config::ClientConfig; +use crate::error::ObnamError; +use clap::Parser; + +/// Show actual client configuration. +#[derive(Debug, Parser)] +pub struct ShowConfig {} + +impl ShowConfig { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + println!("{}", serde_json::to_string_pretty(config)?); + Ok(()) + } +} diff --git a/src/cmd/show_gen.rs b/src/cmd/show_gen.rs index d355389..95d3fd3 100644 --- a/src/cmd/show_gen.rs +++ b/src/cmd/show_gen.rs @@ -1,46 +1,101 @@ +//! The `show-generation` subcommand. + +use crate::chunk::ClientTrust; use crate::client::BackupClient; -use crate::client::ClientConfig; +use crate::config::ClientConfig; +use crate::db::DbInt; use crate::error::ObnamError; use crate::fsentry::FilesystemKind; +use crate::generation::GenId; +use clap::Parser; use indicatif::HumanBytes; +use serde::Serialize; use tempfile::NamedTempFile; +use tokio::runtime::Runtime; + +/// Show information about a generation. +#[derive(Debug, Parser)] +pub struct ShowGeneration { + /// Reference to the generation. Defaults to latest. + #[clap(default_value = "latest")] + gen_id: String, +} -pub fn show_generation(config: &ClientConfig, gen_ref: &str) -> anyhow::Result<()> { - // Create a named temporary file. We don't meed the open file - // handle, so we discard that. - let dbname = { +impl ShowGeneration { + /// Run the command. + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let rt = Runtime::new()?; + rt.block_on(self.run_async(config)) + } + + async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { let temp = NamedTempFile::new()?; - let (_, dbname) = temp.keep()?; - dbname - }; - - let client = BackupClient::new(&config.server_url)?; - - let genlist = client.list_generations()?; - let gen_id: String = match genlist.resolve(gen_ref) { - None => return Err(ObnamError::UnknownGeneration(gen_ref.to_string()).into()), - Some(id) => id, - }; - - let gen = client.fetch_generation(&gen_id, &dbname)?; - let files = gen.files()?; - - let total_bytes = files.iter().fold(0, |acc, file| { - let e = file.entry(); - if e.kind() == FilesystemKind::Regular { - acc + file.entry().len() - } else { - acc + let client = BackupClient::new(config)?; + let trust = client + .get_client_trust() + .await? + .or_else(|| Some(ClientTrust::new("FIXME", None, "".to_string(), vec![]))) + .unwrap(); + + let genlist = client.list_generations(&trust); + let gen_id = genlist.resolve(&self.gen_id)?; + let gen = client.fetch_generation(&gen_id, temp.path()).await?; + let mut files = gen.files()?; + let mut files = files.iter()?; + + let total_bytes = files.try_fold(0, |acc, file| { + file.map(|(_, e, _, _)| { + if e.kind() == FilesystemKind::Regular { + acc + e.len() + } else { + acc + } + }) + }); + let total_bytes = total_bytes?; + + let output = Output::new(gen_id) + .db_bytes(temp.path().metadata()?.len()) + .file_count(gen.file_count()?) + .file_bytes(total_bytes); + serde_json::to_writer_pretty(std::io::stdout(), &output)?; + + Ok(()) + } +} + +#[derive(Debug, Default, Serialize)] +struct Output { + generation_id: String, + file_count: DbInt, + file_bytes: String, + file_bytes_raw: u64, + db_bytes: String, + db_bytes_raw: u64, +} + +impl Output { + fn new(gen_id: GenId) -> Self { + Self { + generation_id: format!("{}", gen_id), + ..Self::default() } - }); + } - println!("generation-id: {}", gen_id); - println!("file-count: {}", gen.file_count()?); - println!("file-bytes: {}", HumanBytes(total_bytes)); - println!("file-bytes-raw: {}", total_bytes); + fn file_count(mut self, n: DbInt) -> Self { + self.file_count = n; + self + } - // Delete the temporary file. - std::fs::remove_file(&dbname)?; + fn file_bytes(mut self, n: u64) -> Self { + self.file_bytes_raw = n; + self.file_bytes = HumanBytes(n).to_string(); + self + } - Ok(()) + fn db_bytes(mut self, n: u64) -> Self { + self.db_bytes_raw = n; + self.db_bytes = HumanBytes(n).to_string(); + self + } } diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..5774aad --- /dev/null +++ b/src/config.rs @@ -0,0 +1,142 @@ +//! Client configuration. + +use crate::passwords::{passwords_filename, PasswordError, Passwords}; + +use bytesize::MIB; +use log::{error, trace}; +use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; + +const DEFAULT_CHUNK_SIZE: usize = MIB as usize; +const DEVNULL: &str = "/dev/null"; + +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +struct TentativeClientConfig { + server_url: String, + verify_tls_cert: Option<bool>, + chunk_size: Option<usize>, + roots: Vec<PathBuf>, + log: Option<PathBuf>, + exclude_cache_tag_directories: Option<bool>, +} + +/// Configuration for the Obnam client. +#[derive(Debug, Serialize, Clone)] +pub struct ClientConfig { + /// Name of configuration file. + pub filename: PathBuf, + /// URL of Obnam server. + pub server_url: String, + /// Should server's TLS certificate be verified using CA + /// signatures? Set to false, for self-signed certificates. + pub verify_tls_cert: bool, + /// Size of chunks when splitting files for backup. + pub chunk_size: usize, + /// Backup root directories. + pub roots: Vec<PathBuf>, + /// File where logs should be written. + pub log: PathBuf, + /// Should cache directories be excluded? Cache directories + /// contain a specially formatted CACHEDIR.TAG file. + pub exclude_cache_tag_directories: bool, +} + +impl ClientConfig { + /// Read a client configuration from a file. + pub fn read(filename: &Path) -> Result<Self, ClientConfigError> { + trace!("read_config: filename={:?}", filename); + let config = std::fs::read_to_string(filename) + .map_err(|err| ClientConfigError::Read(filename.to_path_buf(), err))?; + let tentative: TentativeClientConfig = serde_yaml::from_str(&config) + .map_err(|err| ClientConfigError::YamlParse(filename.to_path_buf(), err))?; + let roots = tentative + .roots + .iter() + .map(|path| expand_tilde(path)) + .collect(); + let log = tentative + .log + .map(|path| expand_tilde(&path)) + .unwrap_or_else(|| PathBuf::from(DEVNULL)); + let exclude_cache_tag_directories = tentative.exclude_cache_tag_directories.unwrap_or(true); + + let config = Self { + chunk_size: tentative.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE), + filename: filename.to_path_buf(), + roots, + server_url: tentative.server_url, + verify_tls_cert: tentative.verify_tls_cert.unwrap_or(false), + log, + exclude_cache_tag_directories, + }; + + config.check()?; + Ok(config) + } + + fn check(&self) -> Result<(), ClientConfigError> { + if self.server_url.is_empty() { + return Err(ClientConfigError::ServerUrlIsEmpty); + } + if !self.server_url.starts_with("https://") { + return Err(ClientConfigError::NotHttps(self.server_url.to_string())); + } + if self.roots.is_empty() { + return Err(ClientConfigError::NoBackupRoot); + } + Ok(()) + } + + /// Read encryption passwords from a file. + /// + /// The password file is expected to be next to the configuration file. + pub fn passwords(&self) -> Result<Passwords, ClientConfigError> { + Passwords::load(&passwords_filename(&self.filename)) + .map_err(ClientConfigError::PasswordsMissing) + } +} + +/// Possible errors from configuration files. +#[derive(Debug, thiserror::Error)] +pub enum ClientConfigError { + /// The configuration specifies the server URL as an empty string. + #[error("server_url is empty")] + ServerUrlIsEmpty, + + /// The configuration does not specify any backup root directories. + #[error("No backup roots in config; at least one is needed")] + NoBackupRoot, + + /// The server URL is not an https: one. + #[error("server URL doesn't use https: {0}")] + NotHttps(String), + + /// There are no passwords stored. + #[error("No passwords are set: you may need to run 'obnam init': {0}")] + PasswordsMissing(PasswordError), + + /// Error reading a configuation file. + #[error("failed to read configuration file {0}: {1}")] + Read(PathBuf, std::io::Error), + + /// Error parsing configuration file as YAML. + #[error("failed to parse configuration file {0} as YAML: {1}")] + YamlParse(PathBuf, serde_yaml::Error), +} + +fn expand_tilde(path: &Path) -> PathBuf { + if path.starts_with("~/") { + if let Some(home) = std::env::var_os("HOME") { + let mut expanded = PathBuf::from(home); + for comp in path.components().skip(1) { + expanded.push(comp); + } + expanded + } else { + path.to_path_buf() + } + } else { + path.to_path_buf() + } +} diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..392134d --- /dev/null +++ b/src/db.rs @@ -0,0 +1,640 @@ +//! A database abstraction around SQLite for Obnam. +//! +//! This abstraction provided the bare minimum that Obnam needs, while +//! trying to be as performant as possible, especially for inserting +//! rows. Only data types needed by Obnam are supported. +//! +//! Note that this abstraction is entirely synchronous. This is for +//! simplicity, as SQLite only allows one write at a time. + +use crate::fsentry::FilesystemEntry; +use rusqlite::{params, types::ToSqlOutput, CachedStatement, Connection, OpenFlags, Row, ToSql}; +use std::collections::HashSet; +use std::convert::TryFrom; +use std::path::{Path, PathBuf}; + +/// A database. +pub struct Database { + conn: Connection, +} + +impl Database { + /// Create a new database file for an empty database. + /// + /// The database can be written to. + pub fn create<P: AsRef<Path>>(filename: P) -> Result<Self, DatabaseError> { + if filename.as_ref().exists() { + return Err(DatabaseError::Exists(filename.as_ref().to_path_buf())); + } + let flags = OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE; + let conn = Connection::open_with_flags(filename, flags)?; + conn.execute("BEGIN", params![])?; + Ok(Self { conn }) + } + + /// Open an existing database file in read only mode. + pub fn open<P: AsRef<Path>>(filename: P) -> Result<Self, DatabaseError> { + let flags = OpenFlags::SQLITE_OPEN_READ_ONLY; + let conn = Connection::open_with_flags(filename, flags)?; + Ok(Self { conn }) + } + + /// Close an open database, committing any changes to disk. + pub fn close(self) -> Result<(), DatabaseError> { + self.conn.execute("COMMIT", params![])?; + self.conn + .close() + .map_err(|(_, err)| DatabaseError::Rusqlite(err))?; + Ok(()) + } + + /// Create a table in the database. + pub fn create_table(&self, table: &Table) -> Result<(), DatabaseError> { + let sql = sql_statement::create_table(table); + self.conn.execute(&sql, params![])?; + Ok(()) + } + + /// Create an index in the database. + pub fn create_index( + &self, + name: &str, + table: &Table, + column: &str, + ) -> Result<(), DatabaseError> { + let sql = sql_statement::create_index(name, table, column); + self.conn.execute(&sql, params![])?; + Ok(()) + } + + /// Insert a row in a table. + pub fn insert(&mut self, table: &Table, values: &[Value]) -> Result<(), DatabaseError> { + let mut stmt = self.conn.prepare_cached(table.insert())?; + assert!(table.has_columns(values)); + // The ToSql trait implementation for Obnam values can't ever + // fail, so we don't handle the error case in the parameter + // creation below. + stmt.execute(rusqlite::params_from_iter(values.iter().map(|v| { + v.to_sql() + .expect("conversion of Obnam value to SQLite value failed unexpectedly") + })))?; + Ok(()) + } + + /// Return an iterator for all rows in a table. + pub fn all_rows<T>( + &self, + table: &Table, + rowfunc: &'static dyn Fn(&Row) -> Result<T, rusqlite::Error>, + ) -> Result<SqlResults<T>, DatabaseError> { + let sql = sql_statement::select_all_rows(table); + SqlResults::new( + &self.conn, + &sql, + None, + Box::new(|stmt, _| { + let iter = stmt.query_map(params![], |row| rowfunc(row))?; + let iter = iter.map(|x| match x { + Ok(t) => Ok(t), + Err(e) => Err(DatabaseError::Rusqlite(e)), + }); + Ok(Box::new(iter)) + }), + ) + } + + /// Return rows that have a given value in a given column. + /// + /// This is simplistic, but for Obnam, it provides all the SQL + /// SELECT ... WHERE that's needed, and there's no point in making + /// this more generic than is needed. + pub fn some_rows<T>( + &self, + table: &Table, + value: &Value, + rowfunc: &'static dyn Fn(&Row) -> Result<T, rusqlite::Error>, + ) -> Result<SqlResults<T>, DatabaseError> { + assert!(table.has_column(value)); + let sql = sql_statement::select_some_rows(table, value.name()); + SqlResults::new( + &self.conn, + &sql, + Some(OwnedValue::from(value)), + Box::new(|stmt, value| { + let iter = stmt.query_map(params![value], |row| rowfunc(row))?; + let iter = iter.map(|x| match x { + Ok(t) => Ok(t), + Err(e) => Err(DatabaseError::Rusqlite(e)), + }); + Ok(Box::new(iter)) + }), + ) + } +} + +/// Possible errors from a database. +#[derive(Debug, thiserror::Error)] +pub enum DatabaseError { + /// An error from the rusqlite crate. + #[error(transparent)] + Rusqlite(#[from] rusqlite::Error), + + /// The database being created already exists. + #[error("Database {0} already exists")] + Exists(PathBuf), +} + +// A pointer to a "fallible iterator" over values of type `T`, which is to say it's an iterator +// over values of type `Result<T, DatabaseError>`. The iterator is only valid for the +// lifetime 'stmt. +// +// The fact that it's a pointer (`Box<dyn ...>`) means we don't care what the actual type of +// the iterator is, and who produces it. +type SqlResultsIterator<'stmt, T> = Box<dyn Iterator<Item = Result<T, DatabaseError>> + 'stmt>; + +// A pointer to a function which, when called on a prepared SQLite statement, would create +// a "fallible iterator" over values of type `ItemT`. (See above for an explanation of what +// a "fallible iterator" is.) +// +// The iterator is only valid for the lifetime of the associated SQLite statement; we +// call this lifetime 'stmt, and use it both both on the reference and the returned iterator. +// +// Now we're in a pickle: all named lifetimes have to be declared _somewhere_, but we can't add +// 'stmt to the signature of `CreateIterFn` because then we'll have to specify it when we +// define the function. Obviously, at that point we won't yet have a `Statement`, and thus we +// would have no idea what its lifetime is going to be. So we can't put the 'stmt lifetime into +// the signature of `CreateIterFn`. +// +// That's what `for<'stmt>` is for. This is a so-called ["higher-rank trait bound"][hrtb], and +// it enables us to say that a function is valid for *some* lifetime 'stmt that we pass into it +// at the call site. It lets Rust continue to track lifetimes even though `CreateIterFn` +// interferes by "hiding" the 'stmt lifetime from its signature. +// +// [hrtb]: https://doc.rust-lang.org/nomicon/hrtb.html +type CreateIterFn<'conn, ItemT> = Box< + dyn for<'stmt> Fn( + &'stmt mut CachedStatement<'conn>, + &Option<OwnedValue>, + ) -> Result<SqlResultsIterator<'stmt, ItemT>, DatabaseError>, +>; + +/// An iterator over rows from a query. +pub struct SqlResults<'conn, ItemT> { + stmt: CachedStatement<'conn>, + value: Option<OwnedValue>, + create_iter: CreateIterFn<'conn, ItemT>, +} + +impl<'conn, ItemT> SqlResults<'conn, ItemT> { + fn new( + conn: &'conn Connection, + statement: &str, + value: Option<OwnedValue>, + create_iter: CreateIterFn<'conn, ItemT>, + ) -> Result<Self, DatabaseError> { + let stmt = conn.prepare_cached(statement)?; + Ok(Self { + stmt, + value, + create_iter, + }) + } + + /// Create an iterator over results. + pub fn iter(&'_ mut self) -> Result<SqlResultsIterator<'_, ItemT>, DatabaseError> { + (self.create_iter)(&mut self.stmt, &self.value) + } +} + +/// Describe a table in a row. +pub struct Table { + table: String, + columns: Vec<Column>, + insert: Option<String>, + column_names: HashSet<String>, +} + +impl Table { + /// Create a new table description without columns. + /// + /// The table description is not "built". You must add columns and + /// then call the [`build`] method. + pub fn new(table: &str) -> Self { + Self { + table: table.to_string(), + columns: vec![], + insert: None, + column_names: HashSet::new(), + } + } + + /// Append a column. + pub fn column(mut self, column: Column) -> Self { + self.column_names.insert(column.name().to_string()); + self.columns.push(column); + self + } + + /// Finish building the table description. + pub fn build(mut self) -> Self { + assert!(self.insert.is_none()); + self.insert = Some(sql_statement::insert(&self)); + self + } + + fn has_columns(&self, values: &[Value]) -> bool { + assert!(self.insert.is_some()); + for v in values.iter() { + if !self.column_names.contains(v.name()) { + return false; + } + } + true + } + + fn has_column(&self, value: &Value) -> bool { + assert!(self.insert.is_some()); + self.column_names.contains(value.name()) + } + + fn insert(&self) -> &str { + assert!(self.insert.is_some()); + self.insert.as_ref().unwrap() + } + + /// What is the name of the table? + pub fn name(&self) -> &str { + &self.table + } + + /// How many columns does the table have? + pub fn num_columns(&self) -> usize { + self.columns.len() + } + + /// What are the names of the columns in the table? + pub fn column_names(&self) -> impl Iterator<Item = &str> { + self.columns.iter().map(|c| c.name()) + } + + /// Return SQL column definitions for the table. + pub fn column_definitions(&self) -> String { + let mut ret = String::new(); + for c in self.columns.iter() { + if !ret.is_empty() { + ret.push(','); + } + ret.push_str(c.name()); + ret.push(' '); + ret.push_str(c.typename()); + } + ret + } +} + +/// A column in a table description. +pub enum Column { + /// An integer primary key. + PrimaryKey(String), + /// An integer. + Int(String), + /// A text string. + Text(String), + /// A binary string. + Blob(String), + /// A boolean. + Bool(String), +} + +impl Column { + fn name(&self) -> &str { + match self { + Self::PrimaryKey(name) => name, + Self::Int(name) => name, + Self::Text(name) => name, + Self::Blob(name) => name, + Self::Bool(name) => name, + } + } + + fn typename(&self) -> &str { + match self { + Self::PrimaryKey(_) => "INTEGER PRIMARY KEY", + Self::Int(_) => "INTEGER", + Self::Text(_) => "TEXT", + Self::Blob(_) => "BLOB", + Self::Bool(_) => "BOOLEAN", + } + } + + /// Create an integer primary key column. + pub fn primary_key(name: &str) -> Self { + Self::PrimaryKey(name.to_string()) + } + + /// Create an integer column. + pub fn int(name: &str) -> Self { + Self::Int(name.to_string()) + } + + /// Create a text string column. + pub fn text(name: &str) -> Self { + Self::Text(name.to_string()) + } + + /// Create a binary string column. + pub fn blob(name: &str) -> Self { + Self::Blob(name.to_string()) + } + + /// Create a boolean column. + pub fn bool(name: &str) -> Self { + Self::Bool(name.to_string()) + } +} + +/// Type of plain integers that can be stored. +pub type DbInt = i64; + +/// A value in a named column. +#[derive(Debug)] +pub enum Value<'a> { + /// An integer primary key. + PrimaryKey(&'a str, DbInt), + /// An integer. + Int(&'a str, DbInt), + /// A text string. + Text(&'a str, &'a str), + /// A binary string. + Blob(&'a str, &'a [u8]), + /// A boolean. + Bool(&'a str, bool), +} + +impl<'a> Value<'a> { + /// What column should store this value? + pub fn name(&self) -> &str { + match self { + Self::PrimaryKey(name, _) => name, + Self::Int(name, _) => name, + Self::Text(name, _) => name, + Self::Blob(name, _) => name, + Self::Bool(name, _) => name, + } + } + + /// Create an integer primary key value. + pub fn primary_key(name: &'a str, value: DbInt) -> Self { + Self::PrimaryKey(name, value) + } + + /// Create an integer value. + pub fn int(name: &'a str, value: DbInt) -> Self { + Self::Int(name, value) + } + + /// Create a text string value. + pub fn text(name: &'a str, value: &'a str) -> Self { + Self::Text(name, value) + } + + /// Create a binary string value. + pub fn blob(name: &'a str, value: &'a [u8]) -> Self { + Self::Blob(name, value) + } + + /// Create a boolean value. + pub fn bool(name: &'a str, value: bool) -> Self { + Self::Bool(name, value) + } +} + +#[allow(clippy::useless_conversion)] +impl<'a> ToSql for Value<'a> { + // The trait defines to_sql to return a Result. However, for our + // particular case, to_sql can't ever fail. We only store values + // in types for which conversion always succeeds: integer, + // boolean, text, and blob. _For us_, the caller need never worry + // that the conversion fails, but we can't express that in the + // type system. + fn to_sql(&self) -> Result<rusqlite::types::ToSqlOutput, rusqlite::Error> { + use rusqlite::types::ValueRef; + let v = match self { + Self::PrimaryKey(_, v) => ValueRef::Integer( + i64::try_from(*v) + .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?, + ), + Self::Int(_, v) => ValueRef::Integer( + i64::try_from(*v) + .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?, + ), + Self::Bool(_, v) => ValueRef::Integer(i64::from(*v)), + Self::Text(_, v) => ValueRef::Text(v.as_ref()), + Self::Blob(_, v) => ValueRef::Blob(v), + }; + Ok(ToSqlOutput::Borrowed(v)) + } +} + +/// Like a Value, but owns the data. +pub enum OwnedValue { + /// An integer primary key. + PrimaryKey(String, DbInt), + /// An integer. + Int(String, DbInt), + /// A text string. + Text(String, String), + /// A binary string. + Blob(String, Vec<u8>), + /// A boolean. + Bool(String, bool), +} + +impl From<&Value<'_>> for OwnedValue { + fn from(v: &Value) -> Self { + match *v { + Value::PrimaryKey(name, v) => Self::PrimaryKey(name.to_string(), v), + Value::Int(name, v) => Self::Int(name.to_string(), v), + Value::Text(name, v) => Self::Text(name.to_string(), v.to_string()), + Value::Blob(name, v) => Self::Blob(name.to_string(), v.to_vec()), + Value::Bool(name, v) => Self::Bool(name.to_string(), v), + } + } +} + +impl ToSql for OwnedValue { + #[allow(clippy::useless_conversion)] + fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput> { + use rusqlite::types::Value; + let v = match self { + Self::PrimaryKey(_, v) => Value::Integer( + i64::try_from(*v) + .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?, + ), + Self::Int(_, v) => Value::Integer( + i64::try_from(*v) + .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?, + ), + Self::Bool(_, v) => Value::Integer(i64::from(*v)), + Self::Text(_, v) => Value::Text(v.to_string()), + Self::Blob(_, v) => Value::Blob(v.to_vec()), + }; + Ok(ToSqlOutput::Owned(v)) + } +} + +impl rusqlite::types::ToSql for FilesystemEntry { + fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> { + let json = serde_json::to_string(self) + .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?; + let json = rusqlite::types::Value::Text(json); + Ok(ToSqlOutput::Owned(json)) + } +} + +mod sql_statement { + use super::Table; + + pub fn create_table(table: &Table) -> String { + format!( + "CREATE TABLE {} ({})", + table.name(), + table.column_definitions() + ) + } + + pub fn create_index(name: &str, table: &Table, column: &str) -> String { + format!("CREATE INDEX {} ON {} ({})", name, table.name(), column,) + } + + pub fn insert(table: &Table) -> String { + format!( + "INSERT INTO {} ({}) VALUES ({})", + table.name(), + &column_names(table), + placeholders(table.column_names().count()) + ) + } + + pub fn select_all_rows(table: &Table) -> String { + format!("SELECT * FROM {}", table.name()) + } + + pub fn select_some_rows(table: &Table, column: &str) -> String { + format!("SELECT * FROM {} WHERE {} = ?", table.name(), column) + } + + fn column_names(table: &Table) -> String { + table.column_names().collect::<Vec<&str>>().join(",") + } + + fn placeholders(num_columns: usize) -> String { + let mut s = String::new(); + for _ in 0..num_columns { + if !s.is_empty() { + s.push(','); + } + s.push('?'); + } + s + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::path::Path; + use tempfile::tempdir; + + fn get_bar(row: &rusqlite::Row) -> Result<DbInt, rusqlite::Error> { + row.get("bar") + } + + fn table() -> Table { + Table::new("foo").column(Column::int("bar")).build() + } + + fn create_db(file: &Path) -> Database { + let table = table(); + let db = Database::create(file).unwrap(); + db.create_table(&table).unwrap(); + db + } + + fn open_db(file: &Path) -> Database { + Database::open(file).unwrap() + } + + fn insert(db: &mut Database, value: DbInt) { + let table = table(); + db.insert(&table, &[Value::int("bar", value)]).unwrap(); + } + + fn values(db: Database) -> Vec<DbInt> { + let table = table(); + let mut rows = db.all_rows(&table, &get_bar).unwrap(); + let iter = rows.iter().unwrap(); + let mut values = vec![]; + for x in iter { + values.push(x.unwrap()); + } + values + } + + #[test] + fn creates_db() { + let tmp = tempdir().unwrap(); + let filename = tmp.path().join("test.db"); + let db = Database::create(&filename).unwrap(); + db.close().unwrap(); + let _ = Database::open(&filename).unwrap(); + } + + #[test] + fn inserts_row() { + let tmp = tempdir().unwrap(); + let filename = tmp.path().join("test.db"); + let mut db = create_db(&filename); + insert(&mut db, 42); + db.close().unwrap(); + + let db = open_db(&filename); + let values = values(db); + assert_eq!(values, vec![42]); + } + + #[test] + fn inserts_many_rows() { + const N: DbInt = 1000; + + let tmp = tempdir().unwrap(); + let filename = tmp.path().join("test.db"); + let mut db = create_db(&filename); + for i in 0..N { + insert(&mut db, i); + } + db.close().unwrap(); + + let db = open_db(&filename); + let values = values(db); + assert_eq!(values.len() as DbInt, N); + + let mut expected = vec![]; + for i in 0..N { + expected.push(i); + } + assert_eq!(values, expected); + } + #[test] + fn round_trips_int_max() { + let tmp = tempdir().unwrap(); + let filename = tmp.path().join("test.db"); + let mut db = create_db(&filename); + insert(&mut db, DbInt::MAX); + db.close().unwrap(); + + let db = open_db(&filename); + let values = values(db); + assert_eq!(values, vec![DbInt::MAX]); + } +} diff --git a/src/dbgen.rs b/src/dbgen.rs new file mode 100644 index 0000000..0053d4a --- /dev/null +++ b/src/dbgen.rs @@ -0,0 +1,768 @@ +//! Database abstraction for generations. + +use crate::backup_reason::Reason; +use crate::chunkid::ChunkId; +use crate::db::{Column, Database, DatabaseError, DbInt, SqlResults, Table, Value}; +use crate::fsentry::FilesystemEntry; +use crate::genmeta::{GenerationMeta, GenerationMetaError}; +use crate::label::LabelChecksumKind; +use crate::schema::{SchemaVersion, VersionComponent}; +use log::error; +use std::collections::HashMap; +use std::os::unix::ffi::OsStrExt; +use std::path::{Path, PathBuf}; + +/// Return latest supported schema version for a supported major +/// version. +pub fn schema_version(major: VersionComponent) -> Result<SchemaVersion, GenerationDbError> { + match major { + 0 => Ok(SchemaVersion::new(0, 0)), + 1 => Ok(SchemaVersion::new(1, 0)), + _ => Err(GenerationDbError::Unsupported(major)), + } +} + +/// Default database schema major version.a +pub const DEFAULT_SCHEMA_MAJOR: VersionComponent = V0_0::MAJOR; + +/// Major schema versions supported by this version of Obnam. +pub const SCHEMA_MAJORS: &[VersionComponent] = &[0, 1]; + +/// An integer identifier for a file in a generation. +pub type FileId = DbInt; + +/// Possible errors from using generation databases. +#[derive(Debug, thiserror::Error)] +pub enum GenerationDbError { + /// Duplicate file names. + #[error("Generation has more than one file with the name {0}")] + TooManyFiles(PathBuf), + + /// No 'meta' table in generation. + #[error("Generation does not have a 'meta' table")] + NoMeta, + + /// Missing from from 'meta' table. + #[error("Generation 'meta' table does not have a row {0}")] + NoMetaKey(String), + + /// Bad data in 'meta' table. + #[error("Generation 'meta' row {0} has badly formed integer: {1}")] + BadMetaInteger(String, std::num::ParseIntError), + + /// A major schema version is unsupported. + #[error("Unsupported backup schema major version: {0}")] + Unsupported(VersionComponent), + + /// Local generation uses a schema version that this version of + /// Obnam isn't compatible with. + #[error("Backup is not compatible with this version of Obnam: {0}.{1}")] + Incompatible(VersionComponent, VersionComponent), + + /// Error from a database + #[error(transparent)] + Database(#[from] DatabaseError), + + /// Error from generation metadata. + #[error(transparent)] + GenerationMeta(#[from] GenerationMetaError), + + /// Error from JSON. + #[error(transparent)] + SerdeJsonError(#[from] serde_json::Error), + + /// Error from I/O. + #[error(transparent)] + IoError(#[from] std::io::Error), +} + +/// A database representing a backup generation. +pub struct GenerationDb { + variant: GenerationDbVariant, +} + +enum GenerationDbVariant { + V0_0(V0_0), + V1_0(V1_0), +} + +impl GenerationDb { + /// Create a new generation database in read/write mode. + pub fn create<P: AsRef<Path>>( + filename: P, + schema: SchemaVersion, + checksum_kind: LabelChecksumKind, + ) -> Result<Self, GenerationDbError> { + let meta_table = Self::meta_table(); + let variant = match schema.version() { + (V0_0::MAJOR, V0_0::MINOR) => { + GenerationDbVariant::V0_0(V0_0::create(filename, meta_table, checksum_kind)?) + } + (V1_0::MAJOR, V1_0::MINOR) => { + GenerationDbVariant::V1_0(V1_0::create(filename, meta_table, checksum_kind)?) + } + (major, minor) => return Err(GenerationDbError::Incompatible(major, minor)), + }; + Ok(Self { variant }) + } + + /// Open an existing generation database in read-only mode. + pub fn open<P: AsRef<Path>>(filename: P) -> Result<Self, GenerationDbError> { + let filename = filename.as_ref(); + let meta_table = Self::meta_table(); + let schema = { + let plain_db = Database::open(filename)?; + let rows = Self::meta_rows(&plain_db, &meta_table)?; + GenerationMeta::from(rows)?.schema_version() + }; + let variant = match schema.version() { + (V0_0::MAJOR, V0_0::MINOR) => { + GenerationDbVariant::V0_0(V0_0::open(filename, meta_table)?) + } + (V1_0::MAJOR, V1_0::MINOR) => { + GenerationDbVariant::V1_0(V1_0::open(filename, meta_table)?) + } + (major, minor) => return Err(GenerationDbError::Incompatible(major, minor)), + }; + Ok(Self { variant }) + } + + fn meta_table() -> Table { + Table::new("meta") + .column(Column::text("key")) + .column(Column::text("value")) + .build() + } + + fn meta_rows( + db: &Database, + table: &Table, + ) -> Result<HashMap<String, String>, GenerationDbError> { + let mut map = HashMap::new(); + let mut iter = db.all_rows(table, &row_to_kv)?; + for kv in iter.iter()? { + let (key, value) = kv?; + map.insert(key, value); + } + Ok(map) + } + + /// Close a database, commit any changes. + pub fn close(self) -> Result<(), GenerationDbError> { + match self.variant { + GenerationDbVariant::V0_0(v) => v.close(), + GenerationDbVariant::V1_0(v) => v.close(), + } + } + + /// Return contents of "meta" table as a HashMap. + pub fn meta(&self) -> Result<HashMap<String, String>, GenerationDbError> { + match &self.variant { + GenerationDbVariant::V0_0(v) => v.meta(), + GenerationDbVariant::V1_0(v) => v.meta(), + } + } + + /// Insert a file system entry into the database. + pub fn insert( + &mut self, + e: FilesystemEntry, + fileid: FileId, + ids: &[ChunkId], + reason: Reason, + is_cachedir_tag: bool, + ) -> Result<(), GenerationDbError> { + match &mut self.variant { + GenerationDbVariant::V0_0(v) => v.insert(e, fileid, ids, reason, is_cachedir_tag), + GenerationDbVariant::V1_0(v) => v.insert(e, fileid, ids, reason, is_cachedir_tag), + } + } + + /// Count number of file system entries. + pub fn file_count(&self) -> Result<FileId, GenerationDbError> { + match &self.variant { + GenerationDbVariant::V0_0(v) => v.file_count(), + GenerationDbVariant::V1_0(v) => v.file_count(), + } + } + + /// Does a path refer to a cache directory? + pub fn is_cachedir_tag(&self, filename: &Path) -> Result<bool, GenerationDbError> { + match &self.variant { + GenerationDbVariant::V0_0(v) => v.is_cachedir_tag(filename), + GenerationDbVariant::V1_0(v) => v.is_cachedir_tag(filename), + } + } + + /// Return all chunk ids in database. + pub fn chunkids(&self, fileid: FileId) -> Result<SqlResults<ChunkId>, GenerationDbError> { + match &self.variant { + GenerationDbVariant::V0_0(v) => v.chunkids(fileid), + GenerationDbVariant::V1_0(v) => v.chunkids(fileid), + } + } + + /// Return all file descriptions in database. + pub fn files( + &self, + ) -> Result<SqlResults<(FileId, FilesystemEntry, Reason, bool)>, GenerationDbError> { + match &self.variant { + GenerationDbVariant::V0_0(v) => v.files(), + GenerationDbVariant::V1_0(v) => v.files(), + } + } + + /// Get a file's information given its path. + pub fn get_file(&self, filename: &Path) -> Result<Option<FilesystemEntry>, GenerationDbError> { + match &self.variant { + GenerationDbVariant::V0_0(v) => v.get_file(filename), + GenerationDbVariant::V1_0(v) => v.get_file(filename), + } + } + + /// Get a file's information given its id in the database. + pub fn get_fileno(&self, filename: &Path) -> Result<Option<FileId>, GenerationDbError> { + match &self.variant { + GenerationDbVariant::V0_0(v) => v.get_fileno(filename), + GenerationDbVariant::V1_0(v) => v.get_fileno(filename), + } + } +} + +struct V0_0 { + created: bool, + db: Database, + meta: Table, + files: Table, + chunks: Table, +} + +impl V0_0 { + const MAJOR: VersionComponent = 0; + const MINOR: VersionComponent = 0; + + /// Create a new generation database in read/write mode. + pub fn create<P: AsRef<Path>>( + filename: P, + meta: Table, + checksum_kind: LabelChecksumKind, + ) -> Result<Self, GenerationDbError> { + let db = Database::create(filename.as_ref())?; + let mut moi = Self::new(db, meta); + moi.created = true; + moi.create_tables(checksum_kind)?; + Ok(moi) + } + + /// Open an existing generation database in read-only mode. + pub fn open<P: AsRef<Path>>(filename: P, meta: Table) -> Result<Self, GenerationDbError> { + let db = Database::open(filename.as_ref())?; + Ok(Self::new(db, meta)) + } + + fn new(db: Database, meta: Table) -> Self { + let files = Table::new("files") + .column(Column::primary_key("fileno")) + .column(Column::blob("filename")) + .column(Column::text("json")) + .column(Column::text("reason")) + .column(Column::bool("is_cachedir_tag")) + .build(); + let chunks = Table::new("chunks") + .column(Column::int("fileno")) + .column(Column::text("chunkid")) + .build(); + + Self { + created: false, + db, + meta, + files, + chunks, + } + } + + fn create_tables(&mut self, checksum_kind: LabelChecksumKind) -> Result<(), GenerationDbError> { + self.db.create_table(&self.meta)?; + self.db.create_table(&self.files)?; + self.db.create_table(&self.chunks)?; + + self.db.insert( + &self.meta, + &[ + Value::text("key", "schema_version_major"), + Value::text("value", &format!("{}", Self::MAJOR)), + ], + )?; + self.db.insert( + &self.meta, + &[ + Value::text("key", "schema_version_minor"), + Value::text("value", &format!("{}", Self::MINOR)), + ], + )?; + self.db.insert( + &self.meta, + &[ + Value::text("key", "checksum_kind"), + Value::text("value", checksum_kind.serialize()), + ], + )?; + + Ok(()) + } + + /// Close a database, commit any changes. + pub fn close(self) -> Result<(), GenerationDbError> { + if self.created { + self.db + .create_index("filenames_idx", &self.files, "filename")?; + self.db.create_index("fileid_idx", &self.chunks, "fileno")?; + } + self.db.close().map_err(GenerationDbError::Database) + } + + /// Return contents of "meta" table as a HashMap. + pub fn meta(&self) -> Result<HashMap<String, String>, GenerationDbError> { + let mut map = HashMap::new(); + let mut iter = self.db.all_rows(&self.meta, &row_to_kv)?; + for kv in iter.iter()? { + let (key, value) = kv?; + map.insert(key, value); + } + Ok(map) + } + + /// Insert a file system entry into the database. + pub fn insert( + &mut self, + e: FilesystemEntry, + fileid: FileId, + ids: &[ChunkId], + reason: Reason, + is_cachedir_tag: bool, + ) -> Result<(), GenerationDbError> { + let json = serde_json::to_string(&e)?; + self.db.insert( + &self.files, + &[ + Value::primary_key("fileno", fileid), + Value::blob("filename", &path_into_blob(&e.pathbuf())), + Value::text("json", &json), + Value::text("reason", &format!("{}", reason)), + Value::bool("is_cachedir_tag", is_cachedir_tag), + ], + )?; + for id in ids { + self.db.insert( + &self.chunks, + &[ + Value::int("fileno", fileid), + Value::text("chunkid", &format!("{}", id)), + ], + )?; + } + Ok(()) + } + + /// Count number of file system entries. + pub fn file_count(&self) -> Result<FileId, GenerationDbError> { + // FIXME: this needs to be done use "SELECT count(*) FROM + // files", but the Database abstraction doesn't support that + // yet. + let mut iter = self.db.all_rows(&self.files, &Self::row_to_entry)?; + let mut count = 0; + for _ in iter.iter()? { + count += 1; + } + Ok(count) + } + + /// Does a path refer to a cache directory? + pub fn is_cachedir_tag(&self, filename: &Path) -> Result<bool, GenerationDbError> { + let filename_vec = path_into_blob(filename); + let value = Value::blob("filename", &filename_vec); + let mut rows = self + .db + .some_rows(&self.files, &value, &Self::row_to_entry)?; + let mut iter = rows.iter()?; + + if let Some(row) = iter.next() { + // Make sure there's only one row for a given filename. A + // bug in a previous version, or a maliciously constructed + // generation, could result in there being more than one. + if iter.next().is_some() { + error!("too many files in file lookup"); + Err(GenerationDbError::TooManyFiles(filename.to_path_buf())) + } else { + let (_, _, _, is_cachedir_tag) = row?; + Ok(is_cachedir_tag) + } + } else { + Ok(false) + } + } + + /// Return all chunk ids in database. + pub fn chunkids(&self, fileid: FileId) -> Result<SqlResults<ChunkId>, GenerationDbError> { + let fileid = Value::int("fileno", fileid); + Ok(self.db.some_rows(&self.chunks, &fileid, &row_to_chunkid)?) + } + + /// Return all file descriptions in database. + pub fn files( + &self, + ) -> Result<SqlResults<(FileId, FilesystemEntry, Reason, bool)>, GenerationDbError> { + Ok(self.db.all_rows(&self.files, &Self::row_to_fsentry)?) + } + + /// Get a file's information given its path. + pub fn get_file(&self, filename: &Path) -> Result<Option<FilesystemEntry>, GenerationDbError> { + match self.get_file_and_fileno(filename)? { + None => Ok(None), + Some((_, e, _)) => Ok(Some(e)), + } + } + + /// Get a file's information given its id in the database. + pub fn get_fileno(&self, filename: &Path) -> Result<Option<FileId>, GenerationDbError> { + match self.get_file_and_fileno(filename)? { + None => Ok(None), + Some((id, _, _)) => Ok(Some(id)), + } + } + + fn get_file_and_fileno( + &self, + filename: &Path, + ) -> Result<Option<(FileId, FilesystemEntry, String)>, GenerationDbError> { + let filename_bytes = path_into_blob(filename); + let value = Value::blob("filename", &filename_bytes); + let mut rows = self + .db + .some_rows(&self.files, &value, &Self::row_to_entry)?; + let mut iter = rows.iter()?; + + if let Some(row) = iter.next() { + // Make sure there's only one row for a given filename. A + // bug in a previous version, or a maliciously constructed + // generation, could result in there being more than one. + if iter.next().is_some() { + error!("too many files in file lookup"); + Err(GenerationDbError::TooManyFiles(filename.to_path_buf())) + } else { + let (fileid, ref json, ref reason, _) = row?; + let entry = serde_json::from_str(json)?; + Ok(Some((fileid, entry, reason.to_string()))) + } + } else { + Ok(None) + } + } + + fn row_to_entry(row: &rusqlite::Row) -> rusqlite::Result<(FileId, String, String, bool)> { + let fileno: FileId = row.get("fileno")?; + let json: String = row.get("json")?; + let reason: String = row.get("reason")?; + let is_cachedir_tag: bool = row.get("is_cachedir_tag")?; + Ok((fileno, json, reason, is_cachedir_tag)) + } + + fn row_to_fsentry( + row: &rusqlite::Row, + ) -> rusqlite::Result<(FileId, FilesystemEntry, Reason, bool)> { + let fileno: FileId = row.get("fileno")?; + let json: String = row.get("json")?; + let entry = serde_json::from_str(&json).map_err(|err| { + rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(err)) + })?; + let reason: String = row.get("reason")?; + let reason = Reason::from(&reason); + let is_cachedir_tag: bool = row.get("is_cachedir_tag")?; + Ok((fileno, entry, reason, is_cachedir_tag)) + } +} + +struct V1_0 { + created: bool, + db: Database, + meta: Table, + files: Table, + chunks: Table, +} + +impl V1_0 { + const MAJOR: VersionComponent = 1; + const MINOR: VersionComponent = 0; + + /// Create a new generation database in read/write mode. + pub fn create<P: AsRef<Path>>( + filename: P, + meta: Table, + checksum_kind: LabelChecksumKind, + ) -> Result<Self, GenerationDbError> { + let db = Database::create(filename.as_ref())?; + let mut moi = Self::new(db, meta); + moi.created = true; + moi.create_tables(checksum_kind)?; + Ok(moi) + } + + /// Open an existing generation database in read-only mode. + pub fn open<P: AsRef<Path>>(filename: P, meta: Table) -> Result<Self, GenerationDbError> { + let db = Database::open(filename.as_ref())?; + Ok(Self::new(db, meta)) + } + + fn new(db: Database, meta: Table) -> Self { + let files = Table::new("files") + .column(Column::primary_key("fileid")) + .column(Column::blob("filename")) + .column(Column::text("json")) + .column(Column::text("reason")) + .column(Column::bool("is_cachedir_tag")) + .build(); + let chunks = Table::new("chunks") + .column(Column::int("fileid")) + .column(Column::text("chunkid")) + .build(); + + Self { + created: false, + db, + meta, + files, + chunks, + } + } + + fn create_tables(&mut self, checksum_kind: LabelChecksumKind) -> Result<(), GenerationDbError> { + self.db.create_table(&self.meta)?; + self.db.create_table(&self.files)?; + self.db.create_table(&self.chunks)?; + + self.db.insert( + &self.meta, + &[ + Value::text("key", "schema_version_major"), + Value::text("value", &format!("{}", Self::MAJOR)), + ], + )?; + self.db.insert( + &self.meta, + &[ + Value::text("key", "schema_version_minor"), + Value::text("value", &format!("{}", Self::MINOR)), + ], + )?; + self.db.insert( + &self.meta, + &[ + Value::text("key", "checksum_kind"), + Value::text("value", checksum_kind.serialize()), + ], + )?; + + Ok(()) + } + + /// Close a database, commit any changes. + pub fn close(self) -> Result<(), GenerationDbError> { + if self.created { + self.db + .create_index("filenames_idx", &self.files, "filename")?; + self.db.create_index("fileid_idx", &self.chunks, "fileid")?; + } + self.db.close().map_err(GenerationDbError::Database) + } + + /// Return contents of "meta" table as a HashMap. + pub fn meta(&self) -> Result<HashMap<String, String>, GenerationDbError> { + let mut map = HashMap::new(); + let mut iter = self.db.all_rows(&self.meta, &row_to_kv)?; + for kv in iter.iter()? { + let (key, value) = kv?; + map.insert(key, value); + } + Ok(map) + } + + /// Insert a file system entry into the database. + pub fn insert( + &mut self, + e: FilesystemEntry, + fileid: FileId, + ids: &[ChunkId], + reason: Reason, + is_cachedir_tag: bool, + ) -> Result<(), GenerationDbError> { + let json = serde_json::to_string(&e)?; + self.db.insert( + &self.files, + &[ + Value::primary_key("fileid", fileid), + Value::blob("filename", &path_into_blob(&e.pathbuf())), + Value::text("json", &json), + Value::text("reason", &format!("{}", reason)), + Value::bool("is_cachedir_tag", is_cachedir_tag), + ], + )?; + for id in ids { + self.db.insert( + &self.chunks, + &[ + Value::int("fileid", fileid), + Value::text("chunkid", &format!("{}", id)), + ], + )?; + } + Ok(()) + } + + /// Count number of file system entries. + pub fn file_count(&self) -> Result<FileId, GenerationDbError> { + // FIXME: this needs to be done use "SELECT count(*) FROM + // files", but the Database abstraction doesn't support that + // yet. + let mut iter = self.db.all_rows(&self.files, &Self::row_to_entry)?; + let mut count = 0; + for _ in iter.iter()? { + count += 1; + } + Ok(count) + } + + /// Does a path refer to a cache directory? + pub fn is_cachedir_tag(&self, filename: &Path) -> Result<bool, GenerationDbError> { + let filename_vec = path_into_blob(filename); + let value = Value::blob("filename", &filename_vec); + let mut rows = self + .db + .some_rows(&self.files, &value, &Self::row_to_entry)?; + let mut iter = rows.iter()?; + + if let Some(row) = iter.next() { + // Make sure there's only one row for a given filename. A + // bug in a previous version, or a maliciously constructed + // generation, could result in there being more than one. + if iter.next().is_some() { + error!("too many files in file lookup"); + Err(GenerationDbError::TooManyFiles(filename.to_path_buf())) + } else { + let (_, _, _, is_cachedir_tag) = row?; + Ok(is_cachedir_tag) + } + } else { + Ok(false) + } + } + + /// Return all chunk ids in database. + pub fn chunkids(&self, fileid: FileId) -> Result<SqlResults<ChunkId>, GenerationDbError> { + let fileid = Value::int("fileid", fileid); + Ok(self.db.some_rows(&self.chunks, &fileid, &row_to_chunkid)?) + } + + /// Return all file descriptions in database. + pub fn files( + &self, + ) -> Result<SqlResults<(FileId, FilesystemEntry, Reason, bool)>, GenerationDbError> { + Ok(self.db.all_rows(&self.files, &Self::row_to_fsentry)?) + } + + /// Get a file's information given its path. + pub fn get_file(&self, filename: &Path) -> Result<Option<FilesystemEntry>, GenerationDbError> { + match self.get_file_and_fileno(filename)? { + None => Ok(None), + Some((_, e, _)) => Ok(Some(e)), + } + } + + /// Get a file's information given its id in the database. + pub fn get_fileno(&self, filename: &Path) -> Result<Option<FileId>, GenerationDbError> { + match self.get_file_and_fileno(filename)? { + None => Ok(None), + Some((id, _, _)) => Ok(Some(id)), + } + } + + fn get_file_and_fileno( + &self, + filename: &Path, + ) -> Result<Option<(FileId, FilesystemEntry, String)>, GenerationDbError> { + let filename_bytes = path_into_blob(filename); + let value = Value::blob("filename", &filename_bytes); + let mut rows = self + .db + .some_rows(&self.files, &value, &Self::row_to_entry)?; + let mut iter = rows.iter()?; + + if let Some(row) = iter.next() { + // Make sure there's only one row for a given filename. A + // bug in a previous version, or a maliciously constructed + // generation, could result in there being more than one. + if iter.next().is_some() { + error!("too many files in file lookup"); + Err(GenerationDbError::TooManyFiles(filename.to_path_buf())) + } else { + let (fileid, ref json, ref reason, _) = row?; + let entry = serde_json::from_str(json)?; + Ok(Some((fileid, entry, reason.to_string()))) + } + } else { + Ok(None) + } + } + + fn row_to_entry(row: &rusqlite::Row) -> rusqlite::Result<(FileId, String, String, bool)> { + let fileno: FileId = row.get("fileid")?; + let json: String = row.get("json")?; + let reason: String = row.get("reason")?; + let is_cachedir_tag: bool = row.get("is_cachedir_tag")?; + Ok((fileno, json, reason, is_cachedir_tag)) + } + + fn row_to_fsentry( + row: &rusqlite::Row, + ) -> rusqlite::Result<(FileId, FilesystemEntry, Reason, bool)> { + let fileno: FileId = row.get("fileid")?; + let json: String = row.get("json")?; + let entry = serde_json::from_str(&json).map_err(|err| { + rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(err)) + })?; + let reason: String = row.get("reason")?; + let reason = Reason::from(&reason); + let is_cachedir_tag: bool = row.get("is_cachedir_tag")?; + Ok((fileno, entry, reason, is_cachedir_tag)) + } +} + +fn row_to_kv(row: &rusqlite::Row) -> rusqlite::Result<(String, String)> { + let k = row.get("key")?; + let v = row.get("value")?; + Ok((k, v)) +} + +fn path_into_blob(path: &Path) -> Vec<u8> { + path.as_os_str().as_bytes().to_vec() +} + +fn row_to_chunkid(row: &rusqlite::Row) -> rusqlite::Result<ChunkId> { + let chunkid: String = row.get("chunkid")?; + let chunkid = ChunkId::recreate(&chunkid); + Ok(chunkid) +} + +#[cfg(test)] +mod test { + use super::Database; + use tempfile::tempdir; + + #[test] + fn opens_previously_created_db() { + let dir = tempdir().unwrap(); + let filename = dir.path().join("test.db"); + Database::create(&filename).unwrap(); + assert!(Database::open(&filename).is_ok()); + } +} diff --git a/src/engine.rs b/src/engine.rs new file mode 100644 index 0000000..d35281b --- /dev/null +++ b/src/engine.rs @@ -0,0 +1,123 @@ +//! Engine for doing CPU heavy work in the background. + +use crate::workqueue::WorkQueue; +use futures::stream::{FuturesOrdered, StreamExt}; +use tokio::select; +use tokio::sync::mpsc; + +/// Do heavy work in the background. +/// +/// An engine takes items of work from a work queue, and does the work +/// in the background, using `tokio` blocking tasks. The background +/// work can be CPU intensive or block on I/O. The number of active +/// concurrent tasks is limited to the size of the queue. +/// +/// The actual work is done in a function or closure passed in as a +/// parameter to the engine. The worker function is called with a work +/// item as an argument, in a thread dedicated for that worker +/// function. +/// +/// The need to move work items between threads puts some restrictions +/// on the types used as work items. +pub struct Engine<T> { + rx: mpsc::Receiver<T>, +} + +impl<T: Send + 'static> Engine<T> { + /// Create a new engine. + /// + /// Each engine gets work from a queue, and calls the same worker + /// function for each item of work. The results are put into + /// another, internal queue. + pub fn new<S, F>(queue: WorkQueue<S>, func: F) -> Self + where + F: Send + Copy + 'static + Fn(S) -> T, + S: Send + 'static, + { + let size = queue.size(); + let (tx, rx) = mpsc::channel(size); + tokio::spawn(manage_workers(queue, size, tx, func)); + Self { rx } + } + + /// Get the oldest result of the worker function, if any. + /// + /// This will block until there is a result, or it's known that no + /// more results will be forthcoming. + pub async fn next(&mut self) -> Option<T> { + self.rx.recv().await + } +} + +// This is a normal (non-blocking) background task that retrieves work +// items, launches blocking background tasks for work to be done, and +// waits on those tasks. Care is taken to not launch too many worker +// tasks. +async fn manage_workers<S, T, F>( + mut queue: WorkQueue<S>, + queue_size: usize, + tx: mpsc::Sender<T>, + func: F, +) where + F: Send + 'static + Copy + Fn(S) -> T, + S: Send + 'static, + T: Send + 'static, +{ + let mut workers = FuturesOrdered::new(); + + 'processing: loop { + // Wait for first of various concurrent things to finish. + select! { + biased; + + // Get work to be done. + maybe_work = queue.next() => { + if let Some(work) = maybe_work { + // We got a work item. Launch background task to + // work on it. + let tx = tx.clone(); + workers.push_back(do_work(work, tx, func)); + + // If queue is full, wait for at least one + // background task to finish. + while workers.len() >= queue_size { + workers.next().await; + } + } else { + // Finished with the input queue. Nothing more to do. + break 'processing; + } + } + + // Wait for background task to finish, if there are any + // background tasks currently running. + _ = workers.next(), if !workers.is_empty() => { + // nothing to do here + } + } + } + + while workers.next().await.is_some() { + // Finish the remaining work items. + } +} + +// Work on a work item. +// +// This launches a `tokio` blocking background task, and waits for it +// to finish. The caller spawns a normal (non-blocking) async task for +// this function, so it's OK for this function to wait on the task it +// launches. +async fn do_work<S, T, F>(item: S, tx: mpsc::Sender<T>, func: F) +where + F: Send + 'static + Fn(S) -> T, + S: Send + 'static, + T: Send + 'static, +{ + let result = tokio::task::spawn_blocking(move || func(item)) + .await + .unwrap(); + if let Err(err) = tx.send(result).await { + panic!("failed to send result to channel: {}", err); + } +} diff --git a/src/error.rs b/src/error.rs index d368763..928f258 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,25 +1,99 @@ -use crate::chunkid::ChunkId; +//! Errors from Obnam client. + +use crate::backup_run::BackupError; +use crate::chunk::ClientTrustError; +use crate::cipher::CipherError; +use crate::client::ClientError; +use crate::cmd::restore::RestoreError; +use crate::config::ClientConfigError; +use crate::db::DatabaseError; +use crate::dbgen::GenerationDbError; +use crate::generation::{LocalGenerationError, NascentError}; +use crate::genlist::GenerationListError; +use crate::label::LabelError; +use crate::passwords::PasswordError; use std::path::PathBuf; -use thiserror::Error; +use std::time::SystemTimeError; +use tempfile::PersistError; -/// Define all the kinds of errors any part of this crate can return. -#[derive(Debug, Error)] +/// Define all the kinds of errors that functions corresponding to +/// subcommands of the main program can return. +/// +/// This collects all kinds of errors the Obnam client may get, for +/// convenience. +#[derive(Debug, thiserror::Error)] pub enum ObnamError { - #[error("Can't find backup '{0}'")] - UnknownGeneration(String), + /// Error from chunk labels. + #[error(transparent)] + Label(#[from] LabelError), + + /// Error listing generations on server. + #[error(transparent)] + GenerationListError(#[from] GenerationListError), + + /// Error about client trust chunks. + #[error(transparent)] + ClientTrust(#[from] ClientTrustError), + + /// Error saving passwords. + #[error("couldn't save passwords to {0}: {1}")] + PasswordSave(PathBuf, PasswordError), + + /// Error using server HTTP API. + #[error(transparent)] + ClientError(#[from] ClientError), + + /// Error in client configuration. + #[error(transparent)] + ClientConfigError(#[from] ClientConfigError), + + /// Error making a backup. + #[error(transparent)] + BackupError(#[from] BackupError), + + /// Error making a new backup generation. + #[error(transparent)] + NascentError(#[from] NascentError), + + /// Error encrypting or decrypting. + #[error(transparent)] + CipherError(#[from] CipherError), + + /// Error using local copy of existing backup generation. + #[error(transparent)] + LocalGenerationError(#[from] LocalGenerationError), + + /// Error from generation database. + #[error(transparent)] + GenerationDb(#[from] GenerationDbError), + + /// Error using a Database. + #[error(transparent)] + Database(#[from] DatabaseError), + + /// Error restoring a backup. + #[error(transparent)] + RestoreError(#[from] RestoreError), - #[error("Generation has more than one file with the name {0}")] - TooManyFiles(PathBuf), + /// Error making temporary file persistent. + #[error(transparent)] + PersistError(#[from] PersistError), - #[error("Server response did not have a 'chunk-meta' header for chunk {0}")] - NoChunkMeta(ChunkId), + /// Error doing I/O. + #[error(transparent)] + IoError(#[from] std::io::Error), - #[error("Wrong checksum for chunk {0}, got {1}, expected {2}")] - WrongChecksum(ChunkId, String, String), + /// Error reading system clock. + #[error(transparent)] + SystemTimeError(#[from] SystemTimeError), - #[error("Chunk is missing: {0}")] - MissingChunk(ChunkId), + /// Error regarding JSON. + #[error(transparent)] + SerdeJsonError(#[from] serde_json::Error), - #[error("Chunk is in store too many times: {0}")] - DuplicateChunk(ChunkId), + /// Unexpected cache directories found. + #[error( + "found CACHEDIR.TAG files that aren't present in the previous backup, might be an attack" + )] + NewCachedirTagsFound, } diff --git a/src/fsentry.rs b/src/fsentry.rs index eae11b4..f31d6b5 100644 --- a/src/fsentry.rs +++ b/src/fsentry.rs @@ -1,10 +1,20 @@ +//! An entry in the file system. + +use log::{debug, error}; use serde::{Deserialize, Serialize}; use std::ffi::OsString; use std::fs::read_link; use std::fs::{FileType, Metadata}; -use std::os::linux::fs::MetadataExt; use std::os::unix::ffi::OsStringExt; +use std::os::unix::fs::FileTypeExt; use std::path::{Path, PathBuf}; +use users::{Groups, Users, UsersCache}; + +#[cfg(target_os = "linux")] +use std::os::linux::fs::MetadataExt; + +#[cfg(target_os = "macos")] +use std::os::macos::fs::MetadataExt; /// A file system entry. /// @@ -35,80 +45,245 @@ pub struct FilesystemEntry { // The target of a symbolic link, if any. symlink_target: Option<PathBuf>, + + // User and group owning the file. We store them as both the + // numeric id and the textual name corresponding to the numeric id + // at the time of the backup. + uid: u32, + gid: u32, + user: String, + group: String, +} + +/// Possible errors related to file system entries. +#[derive(Debug, thiserror::Error)] +pub enum FsEntryError { + /// File kind numeric representation is unknown. + #[error("Unknown file kind {0}")] + UnknownFileKindCode(u8), + + /// Failed to read a symbolic link's target. + #[error("failed to read symbolic link target {0}: {1}")] + ReadLink(PathBuf, std::io::Error), } #[allow(clippy::len_without_is_empty)] impl FilesystemEntry { - pub fn from_metadata(path: &Path, meta: &Metadata) -> anyhow::Result<Self> { + /// Create an `FsEntry` from a file's metadata. + pub fn from_metadata( + path: &Path, + meta: &Metadata, + cache: &mut UsersCache, + ) -> Result<Self, FsEntryError> { let kind = FilesystemKind::from_file_type(meta.file_type()); - Ok(Self { - path: path.to_path_buf().into_os_string().into_vec(), - kind: FilesystemKind::from_file_type(meta.file_type()), - len: meta.len(), - mode: meta.st_mode(), - mtime: meta.st_mtime(), - mtime_ns: meta.st_mtime_nsec(), - atime: meta.st_atime(), - atime_ns: meta.st_atime_nsec(), - symlink_target: if kind == FilesystemKind::Symlink { - Some(read_link(path)?) - } else { - None - }, - }) + Ok(EntryBuilder::new(kind) + .path(path.to_path_buf()) + .len(meta.len()) + .mode(meta.st_mode()) + .mtime(meta.st_mtime(), meta.st_mtime_nsec()) + .atime(meta.st_atime(), meta.st_atime_nsec()) + .user(meta.st_uid(), cache)? + .group(meta.st_uid(), cache)? + .symlink_target()? + .build()) } + /// Return the kind of file the entry refers to. pub fn kind(&self) -> FilesystemKind { self.kind } + /// Return full path to the entry. pub fn pathbuf(&self) -> PathBuf { let path = self.path.clone(); PathBuf::from(OsString::from_vec(path)) } + /// Return number of bytes for the entity represented by the entry. pub fn len(&self) -> u64 { self.len } + /// Return the entry's mode bits. pub fn mode(&self) -> u32 { self.mode } + /// Return the entry's access time, whole seconds. pub fn atime(&self) -> i64 { self.atime } + /// Return the entry's access time, nanoseconds since the last full second. pub fn atime_ns(&self) -> i64 { self.atime_ns } + /// Return the entry's modification time, whole seconds. pub fn mtime(&self) -> i64 { self.mtime } + /// Return the entry's modification time, nanoseconds since the last full second. pub fn mtime_ns(&self) -> i64 { self.mtime_ns } + /// Does the entry represent a directory? pub fn is_dir(&self) -> bool { self.kind() == FilesystemKind::Directory } + /// Return target of the symlink the entry represents. pub fn symlink_target(&self) -> Option<PathBuf> { self.symlink_target.clone() } } +#[derive(Debug)] +pub(crate) struct EntryBuilder { + kind: FilesystemKind, + path: PathBuf, + len: u64, + + // 16 bits should be enough for a Unix mode_t. + // https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/sys_stat.h.html + // However, it's 32 bits on Linux, so that's what we store. + mode: u32, + + // Linux can store file system time stamps in nanosecond + // resolution. We store them as two 64-bit integers. + mtime: i64, + mtime_ns: i64, + atime: i64, + atime_ns: i64, + + // The target of a symbolic link, if any. + symlink_target: Option<PathBuf>, + + // User and group owning the file. We store them as both the + // numeric id and the textual name corresponding to the numeric id + // at the time of the backup. + uid: u32, + gid: u32, + user: String, + group: String, +} + +impl EntryBuilder { + pub(crate) fn new(kind: FilesystemKind) -> Self { + Self { + kind, + path: PathBuf::new(), + len: 0, + mode: 0, + mtime: 0, + mtime_ns: 0, + atime: 0, + atime_ns: 0, + symlink_target: None, + uid: 0, + user: "".to_string(), + gid: 0, + group: "".to_string(), + } + } + + pub(crate) fn build(self) -> FilesystemEntry { + FilesystemEntry { + kind: self.kind, + path: self.path.into_os_string().into_vec(), + len: self.len, + mode: self.mode, + mtime: self.mtime, + mtime_ns: self.mtime_ns, + atime: self.atime, + atime_ns: self.atime_ns, + symlink_target: self.symlink_target, + uid: self.uid, + user: self.user, + gid: self.gid, + group: self.group, + } + } + + pub(crate) fn path(mut self, path: PathBuf) -> Self { + self.path = path; + self + } + + pub(crate) fn len(mut self, len: u64) -> Self { + self.len = len; + self + } + + pub(crate) fn mode(mut self, mode: u32) -> Self { + self.mode = mode; + self + } + + pub(crate) fn mtime(mut self, secs: i64, nsec: i64) -> Self { + self.mtime = secs; + self.mtime_ns = nsec; + self + } + + pub(crate) fn atime(mut self, secs: i64, nsec: i64) -> Self { + self.atime = secs; + self.atime_ns = nsec; + self + } + + pub(crate) fn symlink_target(mut self) -> Result<Self, FsEntryError> { + self.symlink_target = if self.kind == FilesystemKind::Symlink { + debug!("reading symlink target for {:?}", self.path); + let target = read_link(&self.path) + .map_err(|err| FsEntryError::ReadLink(self.path.clone(), err))?; + Some(target) + } else { + None + }; + Ok(self) + } + + pub(crate) fn user(mut self, uid: u32, cache: &mut UsersCache) -> Result<Self, FsEntryError> { + self.uid = uid; + self.user = if let Some(user) = cache.get_user_by_uid(uid) { + user.name().to_string_lossy().to_string() + } else { + "".to_string() + }; + Ok(self) + } + + pub(crate) fn group(mut self, gid: u32, cache: &mut UsersCache) -> Result<Self, FsEntryError> { + self.gid = gid; + self.group = if let Some(group) = cache.get_group_by_gid(gid) { + group.name().to_string_lossy().to_string() + } else { + "".to_string() + }; + Ok(self) + } +} + /// Different types of file system entries. -#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum FilesystemKind { + /// Regular file, including a hard link to one. Regular, + /// A directory. Directory, + /// A symbolic link. Symlink, + /// A UNIX domain socket. + Socket, + /// A UNIX named pipe. + Fifo, } impl FilesystemKind { + /// Create a kind from a file type. pub fn from_file_type(file_type: FileType) -> Self { if file_type.is_file() { FilesystemKind::Regular @@ -116,35 +291,39 @@ impl FilesystemKind { FilesystemKind::Directory } else if file_type.is_symlink() { FilesystemKind::Symlink + } else if file_type.is_socket() { + FilesystemKind::Socket + } else if file_type.is_fifo() { + FilesystemKind::Fifo } else { panic!("unknown file type {:?}", file_type); } } + /// Represent a kind as a numeric code. pub fn as_code(&self) -> u8 { match self { FilesystemKind::Regular => 0, FilesystemKind::Directory => 1, FilesystemKind::Symlink => 2, + FilesystemKind::Socket => 3, + FilesystemKind::Fifo => 4, } } - pub fn from_code(code: u8) -> anyhow::Result<Self> { + /// Create a kind from a numeric code. + pub fn from_code(code: u8) -> Result<Self, FsEntryError> { match code { 0 => Ok(FilesystemKind::Regular), 1 => Ok(FilesystemKind::Directory), 2 => Ok(FilesystemKind::Symlink), - _ => Err(Error::UnknownFileKindCode(code).into()), + 3 => Ok(FilesystemKind::Socket), + 4 => Ok(FilesystemKind::Fifo), + _ => Err(FsEntryError::UnknownFileKindCode(code)), } } } -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("unknown file kind code {0}")] - UnknownFileKindCode(u8), -} - #[cfg(test)] mod test { use super::FilesystemKind; @@ -153,6 +332,9 @@ mod test { fn file_kind_regular_round_trips() { one_file_kind_round_trip(FilesystemKind::Regular); one_file_kind_round_trip(FilesystemKind::Directory); + one_file_kind_round_trip(FilesystemKind::Symlink); + one_file_kind_round_trip(FilesystemKind::Socket); + one_file_kind_round_trip(FilesystemKind::Fifo); } fn one_file_kind_round_trip(kind: FilesystemKind) { diff --git a/src/fsiter.rs b/src/fsiter.rs index a40ad34..ef2886d 100644 --- a/src/fsiter.rs +++ b/src/fsiter.rs @@ -1,37 +1,157 @@ -use crate::fsentry::FilesystemEntry; -use log::info; -use std::path::Path; -use walkdir::{IntoIter, WalkDir}; +//! Iterate over directory tree. + +use crate::fsentry::{FilesystemEntry, FsEntryError}; +use log::warn; +use std::path::{Path, PathBuf}; +use users::UsersCache; +use walkdir::{DirEntry, IntoIter, WalkDir}; + +/// Filesystem entry along with additional info about it. +pub struct AnnotatedFsEntry { + /// The file system entry being annotated. + pub inner: FilesystemEntry, + /// Is `entry` a valid CACHEDIR.TAG? + pub is_cachedir_tag: bool, +} /// Iterator over file system entries in a directory tree. pub struct FsIterator { - iter: IntoIter, + iter: SkipCachedirs, +} + +/// Possible errors from iterating over a directory tree. +#[derive(Debug, thiserror::Error)] +pub enum FsIterError { + /// Error from the walkdir crate. + #[error("walkdir failed: {0}")] + WalkDir(walkdir::Error), + + /// Error reading a file's metadata. + #[error("failed to get file system metadata for {0}: {1}")] + Metadata(PathBuf, std::io::Error), + + /// Error related to file system entries. + #[error(transparent)] + FsEntryError(#[from] FsEntryError), } impl FsIterator { - pub fn new(root: &Path) -> Self { + /// Create a new iterator. + pub fn new(root: &Path, exclude_cache_tag_directories: bool) -> Self { Self { - iter: WalkDir::new(root).into_iter(), + iter: SkipCachedirs::new( + WalkDir::new(root).into_iter(), + exclude_cache_tag_directories, + ), } } } impl Iterator for FsIterator { - type Item = Result<FilesystemEntry, anyhow::Error>; + type Item = Result<AnnotatedFsEntry, FsIterError>; fn next(&mut self) -> Option<Self::Item> { - match self.iter.next() { - None => None, - Some(Ok(entry)) => { - info!("found {}", entry.path().display()); - Some(new_entry(&entry)) - } - Some(Err(err)) => Some(Err(err.into())), + self.iter.next() + } +} + +/// Cachedir-aware adaptor for WalkDir: it skips the contents of dirs that contain CACHEDIR.TAG, +/// but still yields entries for the dir and the tag themselves. +struct SkipCachedirs { + cache: UsersCache, + iter: IntoIter, + exclude_cache_tag_directories: bool, + // This is the last tag we've found. `next()` will yield it before asking `iter` for more + // entries. + cachedir_tag: Option<Result<AnnotatedFsEntry, FsIterError>>, +} + +impl SkipCachedirs { + fn new(iter: IntoIter, exclude_cache_tag_directories: bool) -> Self { + Self { + cache: UsersCache::new(), + iter, + exclude_cache_tag_directories, + cachedir_tag: None, + } + } + + fn try_enqueue_cachedir_tag(&mut self, entry: &DirEntry) { + if !self.exclude_cache_tag_directories { + return; + } + + // If this entry is not a directory, it means we already processed its + // parent dir and decided that it's not cached. + if !entry.file_type().is_dir() { + return; + } + + let mut tag_path = entry.path().to_owned(); + tag_path.push("CACHEDIR.TAG"); + + // Tags are required to be regular files -- not even symlinks are allowed. + if !tag_path.is_file() { + return; + }; + + const CACHEDIR_TAG: &[u8] = b"Signature: 8a477f597d28d172789f06886806bc55"; + let mut content = [0u8; CACHEDIR_TAG.len()]; + + let mut file = if let Ok(file) = std::fs::File::open(&tag_path) { + file + } else { + return; + }; + + use std::io::Read; + match file.read_exact(&mut content) { + Ok(_) => (), + // If we can't read the tag file, proceed as if's not there + Err(_) => return, + } + + if content == CACHEDIR_TAG { + self.iter.skip_current_dir(); + self.cachedir_tag = Some(new_entry(&tag_path, true, &mut self.cache)); } } } -fn new_entry(e: &walkdir::DirEntry) -> anyhow::Result<FilesystemEntry> { - let meta = e.metadata()?; - let entry = FilesystemEntry::from_metadata(e.path(), &meta)?; - Ok(entry) +impl Iterator for SkipCachedirs { + type Item = Result<AnnotatedFsEntry, FsIterError>; + + fn next(&mut self) -> Option<Self::Item> { + self.cachedir_tag.take().or_else(|| { + let next = self.iter.next(); + match next { + None => None, + Some(Err(err)) => Some(Err(FsIterError::WalkDir(err))), + Some(Ok(entry)) => { + self.try_enqueue_cachedir_tag(&entry); + Some(new_entry(entry.path(), false, &mut self.cache)) + } + } + }) + } +} + +fn new_entry( + path: &Path, + is_cachedir_tag: bool, + cache: &mut UsersCache, +) -> Result<AnnotatedFsEntry, FsIterError> { + let meta = std::fs::symlink_metadata(path); + let meta = match meta { + Ok(meta) => meta, + Err(err) => { + warn!("failed to get metadata for {}: {}", path.display(), err); + return Err(FsIterError::Metadata(path.to_path_buf(), err)); + } + }; + let entry = FilesystemEntry::from_metadata(path, &meta, cache)?; + let annotated = AnnotatedFsEntry { + inner: entry, + is_cachedir_tag, + }; + Ok(annotated) } diff --git a/src/generation.rs b/src/generation.rs index 8a15363..477edc0 100644 --- a/src/generation.rs +++ b/src/generation.rs @@ -1,11 +1,43 @@ +//! Backup generations of various kinds. + use crate::backup_reason::Reason; use crate::chunkid::ChunkId; +use crate::db::{DatabaseError, SqlResults}; +use crate::dbgen::{FileId, GenerationDb, GenerationDbError}; use crate::fsentry::FilesystemEntry; -use rusqlite::Connection; -use std::path::Path; +use crate::genmeta::{GenerationMeta, GenerationMetaError}; +use crate::label::LabelChecksumKind; +use crate::schema::{SchemaVersion, VersionComponent}; +use serde::Serialize; +use std::fmt; +use std::path::{Path, PathBuf}; + +/// An identifier for a generation. +#[derive(Debug, Clone, Serialize)] +pub struct GenId { + id: ChunkId, +} + +impl GenId { + /// Create a generation identifier from a chunk identifier. + pub fn from_chunk_id(id: ChunkId) -> Self { + Self { id } + } -/// An identifier for a file in a generation. -type FileId = i64; + /// Convert a generation identifier into a chunk identifier. + pub fn as_chunk_id(&self) -> &ChunkId { + &self.id + } +} + +impl fmt::Display for GenId { + /// Format an identifier for display. + /// + /// The output can be parsed to re-created an identical identifier. + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.id) + } +} /// A nascent backup generation. /// @@ -13,90 +45,103 @@ type FileId = i64; /// finished yet, and it's not actually on the server until the upload /// of its generation chunk. pub struct NascentGeneration { - conn: Connection, + db: GenerationDb, fileno: FileId, } +/// Possible errors from nascent backup generations. +#[derive(Debug, thiserror::Error)] +pub enum NascentError { + /// Error backing up a backup root. + #[error("Could not back up a backup root directory: {0}: {1}")] + BackupRootFailed(PathBuf, crate::fsiter::FsIterError), + + /// Error using a local generation. + #[error(transparent)] + LocalGenerationError(#[from] LocalGenerationError), + + /// Error from a GenerationDb. + #[error(transparent)] + GenerationDb(#[from] GenerationDbError), + + /// Error from an SQL transaction. + #[error("SQL transaction error: {0}")] + Transaction(rusqlite::Error), + + /// Error from committing an SQL transaction. + #[error("SQL commit error: {0}")] + Commit(rusqlite::Error), + + /// Error creating a temporary file. + #[error("Failed to create temporary file: {0}")] + TempFile(#[from] std::io::Error), +} + impl NascentGeneration { - pub fn create<P>(filename: P) -> anyhow::Result<Self> + /// Create a new nascent generation. + pub fn create<P>( + filename: P, + schema: SchemaVersion, + checksum_kind: LabelChecksumKind, + ) -> Result<Self, NascentError> where P: AsRef<Path>, { - let conn = sql::create_db(filename.as_ref())?; - Ok(Self { conn, fileno: 0 }) + let db = GenerationDb::create(filename.as_ref(), schema, checksum_kind)?; + Ok(Self { db, fileno: 0 }) + } + + /// Commit any changes, and close the database. + pub fn close(self) -> Result<(), NascentError> { + self.db.close().map_err(NascentError::GenerationDb) } + /// How many files are there now in the nascent generation? pub fn file_count(&self) -> FileId { self.fileno } + /// Insert a new file system entry into a nascent generation. pub fn insert( &mut self, e: FilesystemEntry, ids: &[ChunkId], reason: Reason, - ) -> anyhow::Result<()> { - let t = self.conn.transaction()?; + is_cachedir_tag: bool, + ) -> Result<(), NascentError> { self.fileno += 1; - sql::insert_one(&t, e, self.fileno, ids, reason)?; - t.commit()?; + self.db + .insert(e, self.fileno, ids, reason, is_cachedir_tag)?; Ok(()) } - - pub fn insert_iter<'a>( - &mut self, - entries: impl Iterator<Item = anyhow::Result<(FilesystemEntry, Vec<ChunkId>, Reason)>>, - ) -> anyhow::Result<()> { - let t = self.conn.transaction()?; - for r in entries { - let (e, ids, reason) = r?; - self.fileno += 1; - sql::insert_one(&t, e, self.fileno, &ids[..], reason)?; - } - t.commit()?; - Ok(()) - } -} - -#[cfg(test)] -mod test { - use super::NascentGeneration; - use tempfile::NamedTempFile; - - #[test] - fn empty() { - let filename = NamedTempFile::new().unwrap().path().to_path_buf(); - { - let mut _gen = NascentGeneration::create(&filename).unwrap(); - // _gen is dropped here; the connection is close; the file - // should not be removed. - } - assert!(filename.exists()); - } } -/// A finished generation. +/// A finished generation on the server. /// -/// A generation is finished when it's on the server. It can be restored. +/// A generation is finished when it's on the server. It can be +/// fetched so it can be used as a [`LocalGeneration`]. #[derive(Debug, Clone)] pub struct FinishedGeneration { - id: ChunkId, + id: GenId, ended: String, } impl FinishedGeneration { + /// Create a new finished generation. pub fn new(id: &str, ended: &str) -> Self { - let id = id.parse().unwrap(); // this never fails + let id = GenId::from_chunk_id(id.parse().unwrap()); // this never fails Self { id, ended: ended.to_string(), } } - pub fn id(&self) -> ChunkId { - self.id.clone() + /// Get the generation's identifier. + pub fn id(&self) -> &GenId { + &self.id } + /// When was generation finished? pub fn ended(&self) -> &str { &self.ended } @@ -107,9 +152,51 @@ impl FinishedGeneration { /// This is for querying an existing generation, and other read-only /// operations. pub struct LocalGeneration { - conn: Connection, + db: GenerationDb, +} + +/// Possible errors from using local generations. +#[derive(Debug, thiserror::Error)] +pub enum LocalGenerationError { + /// Duplicate file names. + #[error("Generation has more than one file with the name {0}")] + TooManyFiles(PathBuf), + + /// No 'meta' table in generation. + #[error("Generation does not have a 'meta' table")] + NoMeta, + + /// Local generation uses a schema version that this version of + /// Obnam isn't compatible with. + #[error("Backup is not compatible with this version of Obnam: {0}.{1}")] + Incompatible(VersionComponent, VersionComponent), + + /// Error from generation metadata. + #[error(transparent)] + GenerationMeta(#[from] GenerationMetaError), + + /// Error from SQL. + #[error(transparent)] + RusqliteError(#[from] rusqlite::Error), + + /// Error from a GenerationDb. + #[error(transparent)] + GenerationDb(#[from] GenerationDbError), + + /// Error from a Database. + #[error(transparent)] + Database(#[from] DatabaseError), + + /// Error from JSON. + #[error(transparent)] + SerdeJsonError(#[from] serde_json::Error), + + /// Error from I/O. + #[error(transparent)] + IoError(#[from] std::io::Error), } +/// A backed up file in a local generation. pub struct BackedUpFile { fileno: FileId, entry: FilesystemEntry, @@ -117,8 +204,8 @@ pub struct BackedUpFile { } impl BackedUpFile { - pub fn new(fileno: FileId, entry: FilesystemEntry, reason: &str) -> Self { - let reason = Reason::from_str(reason); + /// Create a new `BackedUpFile`. + pub fn new(fileno: FileId, entry: FilesystemEntry, reason: Reason) -> Self { Self { fileno, entry, @@ -126,179 +213,204 @@ impl BackedUpFile { } } + /// Return id for file in its local generation. pub fn fileno(&self) -> FileId { self.fileno } + /// Return file system entry for file. pub fn entry(&self) -> &FilesystemEntry { &self.entry } + /// Return reason why file is in its local generation. pub fn reason(&self) -> Reason { self.reason } } impl LocalGeneration { - pub fn open<P>(filename: P) -> anyhow::Result<Self> + fn new(db: GenerationDb) -> Self { + Self { db } + } + + /// Open a local file as a local generation. + pub fn open<P>(filename: P) -> Result<Self, LocalGenerationError> where P: AsRef<Path>, { - let conn = sql::open_db(filename.as_ref())?; - Ok(Self { conn }) + let db = GenerationDb::open(filename.as_ref())?; + let gen = Self::new(db); + Ok(gen) } - pub fn file_count(&self) -> anyhow::Result<i64> { - Ok(sql::file_count(&self.conn)?) + /// Return generation metadata for local generation. + pub fn meta(&self) -> Result<GenerationMeta, LocalGenerationError> { + let map = self.db.meta()?; + GenerationMeta::from(map).map_err(LocalGenerationError::GenerationMeta) } - pub fn files(&self) -> anyhow::Result<Vec<BackedUpFile>> { - Ok(sql::files(&self.conn)?) + /// How many files are there in the local generation? + pub fn file_count(&self) -> Result<FileId, LocalGenerationError> { + Ok(self.db.file_count()?) } - pub fn chunkids(&self, fileno: FileId) -> anyhow::Result<Vec<ChunkId>> { - Ok(sql::chunkids(&self.conn, fileno)?) + /// Return all files in the local generation. + pub fn files( + &self, + ) -> Result<SqlResults<(FileId, FilesystemEntry, Reason, bool)>, LocalGenerationError> { + self.db.files().map_err(LocalGenerationError::GenerationDb) } - pub fn get_file(&self, filename: &Path) -> anyhow::Result<Option<FilesystemEntry>> { - Ok(sql::get_file(&self.conn, filename)?) + /// Return ids for all chunks in local generation. + pub fn chunkids(&self, fileid: FileId) -> Result<SqlResults<ChunkId>, LocalGenerationError> { + self.db + .chunkids(fileid) + .map_err(LocalGenerationError::GenerationDb) } - pub fn get_fileno(&self, filename: &Path) -> anyhow::Result<Option<FileId>> { - Ok(sql::get_fileno(&self.conn, filename)?) + /// Return entry for a file, given its pathname. + pub fn get_file( + &self, + filename: &Path, + ) -> Result<Option<FilesystemEntry>, LocalGenerationError> { + self.db + .get_file(filename) + .map_err(LocalGenerationError::GenerationDb) } -} -mod sql { - use super::BackedUpFile; - use super::FileId; - use crate::backup_reason::Reason; - use crate::chunkid::ChunkId; - use crate::error::ObnamError; - use crate::fsentry::FilesystemEntry; - use rusqlite::{params, Connection, OpenFlags, Row, Transaction}; - use std::os::unix::ffi::OsStrExt; - use std::path::Path; - - pub fn create_db(filename: &Path) -> anyhow::Result<Connection> { - let flags = OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE; - let conn = Connection::open_with_flags(filename, flags)?; - conn.execute( - "CREATE TABLE files (fileno INTEGER PRIMARY KEY, filename BLOB, json TEXT, reason TEXT)", - params![], - )?; - conn.execute( - "CREATE TABLE chunks (fileno INTEGER, chunkid TEXT)", - params![], - )?; - conn.execute("CREATE INDEX filenames ON files (filename)", params![])?; - conn.execute("CREATE INDEX filenos ON chunks (fileno)", params![])?; - conn.pragma_update(None, "journal_mode", &"WAL")?; - Ok(conn) + /// Get the id in the local generation of a file, given its pathname. + pub fn get_fileno(&self, filename: &Path) -> Result<Option<FileId>, LocalGenerationError> { + self.db + .get_fileno(filename) + .map_err(LocalGenerationError::GenerationDb) } - pub fn open_db(filename: &Path) -> anyhow::Result<Connection> { - let flags = OpenFlags::SQLITE_OPEN_READ_WRITE; - let conn = Connection::open_with_flags(filename, flags)?; - conn.pragma_update(None, "journal_mode", &"WAL")?; - Ok(conn) - } - - pub fn insert_one( - t: &Transaction, - e: FilesystemEntry, - fileno: FileId, - ids: &[ChunkId], - reason: Reason, - ) -> anyhow::Result<()> { - let json = serde_json::to_string(&e)?; - t.execute( - "INSERT INTO files (fileno, filename, json, reason) VALUES (?1, ?2, ?3, ?4)", - params![fileno, path_into_blob(&e.pathbuf()), &json, reason,], - )?; - for id in ids { - t.execute( - "INSERT INTO chunks (fileno, chunkid) VALUES (?1, ?2)", - params![fileno, id], - )?; - } - Ok(()) + /// Does a pathname refer to a cache directory? + pub fn is_cachedir_tag(&self, filename: &Path) -> Result<bool, LocalGenerationError> { + self.db + .is_cachedir_tag(filename) + .map_err(LocalGenerationError::GenerationDb) } +} - fn path_into_blob(path: &Path) -> Vec<u8> { - path.as_os_str().as_bytes().to_vec() - } +#[cfg(test)] +mod test { + use super::{LabelChecksumKind, LocalGeneration, NascentGeneration, Reason, SchemaVersion}; + use crate::fsentry::EntryBuilder; + use crate::fsentry::FilesystemKind; + use std::path::PathBuf; + use tempfile::{tempdir, NamedTempFile}; - pub fn row_to_entry(row: &Row) -> rusqlite::Result<(FileId, String, String)> { - let fileno: FileId = row.get(row.column_index("fileno")?)?; - let json: String = row.get(row.column_index("json")?)?; - let reason: String = row.get(row.column_index("reason")?)?; - Ok((fileno, json, reason)) - } + #[test] + fn round_trips_u64_max() { + let tmp = tempdir().unwrap(); + let filename = tmp.path().join("test.db"); + let path = PathBuf::from("/"); + let schema = SchemaVersion::new(0, 0); + { + let e = EntryBuilder::new(FilesystemKind::Directory) + .path(path.clone()) + .len(u64::MAX) + .build(); + let mut gen = + NascentGeneration::create(&filename, schema, LabelChecksumKind::Sha256).unwrap(); + gen.insert(e, &[], Reason::IsNew, false).unwrap(); + gen.close().unwrap(); + } - pub fn file_count(conn: &Connection) -> anyhow::Result<FileId> { - let mut stmt = conn.prepare("SELECT count(*) FROM files")?; - let mut iter = stmt.query_map(params![], |row| row.get(0))?; - let count = iter.next().expect("SQL count result (1)"); - let count = count?; - Ok(count) + let db = LocalGeneration::open(&filename).unwrap(); + let e = db.get_file(&path).unwrap().unwrap(); + assert_eq!(e.len(), u64::MAX); } - pub fn files(conn: &Connection) -> anyhow::Result<Vec<BackedUpFile>> { - let mut stmt = conn.prepare("SELECT * FROM files")?; - let iter = stmt.query_map(params![], |row| row_to_entry(row))?; - let mut files = vec![]; - for x in iter { - let (fileno, json, reason) = x?; - let entry = serde_json::from_str(&json)?; - files.push(BackedUpFile::new(fileno, entry, &reason)); + #[test] + fn empty() { + let filename = NamedTempFile::new().unwrap().path().to_path_buf(); + let schema = SchemaVersion::new(0, 0); + { + let mut _gen = + NascentGeneration::create(&filename, schema, LabelChecksumKind::Sha256).unwrap(); + // _gen is dropped here; the connection is close; the file + // should not be removed. } - Ok(files) + assert!(filename.exists()); } - pub fn chunkids(conn: &Connection, fileno: FileId) -> anyhow::Result<Vec<ChunkId>> { - let mut stmt = conn.prepare("SELECT chunkid FROM chunks WHERE fileno = ?1")?; - let iter = stmt.query_map(params![fileno], |row| Ok(row.get(0)?))?; - let mut ids: Vec<ChunkId> = vec![]; - for x in iter { - let fileno: String = x?; - ids.push(ChunkId::from(&fileno)); + // FIXME: This is way too complicated a test function. It should + // be simplified, possibly by re-thinking the abstractions of the + // code it calls. + #[test] + fn remembers_cachedir_tags() { + use crate::{ + backup_reason::Reason, backup_run::FsEntryBackupOutcome, fsentry::FilesystemEntry, + }; + use std::{fs::metadata, path::Path}; + + // Create a `Metadata` structure to pass to other functions (we don't care about the + // contents) + let src_file = NamedTempFile::new().unwrap(); + let metadata = metadata(src_file.path()).unwrap(); + + let dbfile = NamedTempFile::new().unwrap().path().to_path_buf(); + + let nontag_path1 = Path::new("/nontag1"); + let nontag_path2 = Path::new("/dir/nontag2"); + let tag_path1 = Path::new("/a_tag"); + let tag_path2 = Path::new("/another_dir/a_tag"); + + let schema = SchemaVersion::new(0, 0); + let mut gen = + NascentGeneration::create(&dbfile, schema, LabelChecksumKind::Sha256).unwrap(); + let mut cache = users::UsersCache::new(); + + gen.insert( + FilesystemEntry::from_metadata(nontag_path1, &metadata, &mut cache).unwrap(), + &[], + Reason::IsNew, + false, + ) + .unwrap(); + gen.insert( + FilesystemEntry::from_metadata(tag_path1, &metadata, &mut cache).unwrap(), + &[], + Reason::IsNew, + true, + ) + .unwrap(); + + let entries = vec![ + FsEntryBackupOutcome { + entry: FilesystemEntry::from_metadata(nontag_path2, &metadata, &mut cache).unwrap(), + ids: vec![], + reason: Reason::IsNew, + is_cachedir_tag: false, + }, + FsEntryBackupOutcome { + entry: FilesystemEntry::from_metadata(tag_path2, &metadata, &mut cache).unwrap(), + ids: vec![], + reason: Reason::IsNew, + is_cachedir_tag: true, + }, + ]; + + for o in entries { + gen.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag) + .unwrap(); } - Ok(ids) - } - pub fn get_file(conn: &Connection, filename: &Path) -> anyhow::Result<Option<FilesystemEntry>> { - match get_file_and_fileno(conn, filename)? { - None => Ok(None), - Some((_, e, _)) => Ok(Some(e)), - } - } + gen.close().unwrap(); - pub fn get_fileno(conn: &Connection, filename: &Path) -> anyhow::Result<Option<FileId>> { - match get_file_and_fileno(conn, filename)? { - None => Ok(None), - Some((id, _, _)) => Ok(Some(id)), - } - } + let gen = LocalGeneration::open(dbfile).unwrap(); + assert!(!gen.is_cachedir_tag(nontag_path1).unwrap()); + assert!(!gen.is_cachedir_tag(nontag_path2).unwrap()); + assert!(gen.is_cachedir_tag(tag_path1).unwrap()); + assert!(gen.is_cachedir_tag(tag_path2).unwrap()); - fn get_file_and_fileno( - conn: &Connection, - filename: &Path, - ) -> anyhow::Result<Option<(FileId, FilesystemEntry, String)>> { - let mut stmt = conn.prepare("SELECT * FROM files WHERE filename = ?1")?; - let mut iter = - stmt.query_map(params![path_into_blob(filename)], |row| row_to_entry(row))?; - match iter.next() { - None => Ok(None), - Some(Err(e)) => Err(e.into()), - Some(Ok((fileno, json, reason))) => { - let entry = serde_json::from_str(&json)?; - if iter.next() == None { - Ok(Some((fileno, entry, reason))) - } else { - Err(ObnamError::TooManyFiles(filename.to_path_buf()).into()) - } - } - } + // Nonexistent files are not cachedir tags + assert!(!gen.is_cachedir_tag(Path::new("/hello/world")).unwrap()); + assert!(!gen + .is_cachedir_tag(Path::new("/different path/to/another file.txt")) + .unwrap()); } } diff --git a/src/genlist.rs b/src/genlist.rs index 10c614e..3a0d81a 100644 --- a/src/genlist.rs +++ b/src/genlist.rs @@ -1,33 +1,51 @@ +//! A list of generations on the server. + use crate::chunkid::ChunkId; -use crate::generation::FinishedGeneration; +use crate::generation::{FinishedGeneration, GenId}; +/// A list of generations on the server. pub struct GenerationList { list: Vec<FinishedGeneration>, } +/// Possible errors from listing generations. +#[derive(Debug, thiserror::Error)] +pub enum GenerationListError { + /// Server doesn't know about a generation. + #[error("Unknown generation: {0}")] + UnknownGeneration(ChunkId), +} + impl GenerationList { + /// Create a new list of generations. pub fn new(gens: Vec<FinishedGeneration>) -> Self { - let mut list = gens.clone(); + let mut list = gens; list.sort_by_cached_key(|gen| gen.ended().to_string()); Self { list } } + /// Return an iterator over the generations. pub fn iter(&self) -> impl Iterator<Item = &FinishedGeneration> { self.list.iter() } - pub fn resolve(&self, genref: &str) -> Option<String> { + /// Resolve a symbolic name of a generation into its identifier. + /// + /// For example, "latest" refers to the latest backup, but needs + /// to be resolved into an actual, immutable id to actually be + /// restored. + pub fn resolve(&self, genref: &str) -> Result<GenId, GenerationListError> { let gen = if self.list.is_empty() { None } else if genref == "latest" { let i = self.list.len() - 1; Some(self.list[i].clone()) } else { - let genref: ChunkId = genref.parse().unwrap(); + let genref = GenId::from_chunk_id(genref.parse().unwrap()); let hits: Vec<FinishedGeneration> = self .iter() - .filter(|gen| gen.id() == genref) - .map(|gen| gen.clone()) + .filter(|gen| gen.id().as_chunk_id() == genref.as_chunk_id()) + .cloned() .collect(); if hits.len() == 1 { Some(hits[0].clone()) @@ -36,8 +54,10 @@ impl GenerationList { } }; match gen { - None => None, - Some(gen) => Some(gen.id().to_string()), + None => Err(GenerationListError::UnknownGeneration(ChunkId::recreate( + genref, + ))), + Some(gen) => Ok(gen.id().clone()), } } } diff --git a/src/genmeta.rs b/src/genmeta.rs new file mode 100644 index 0000000..d5b14a3 --- /dev/null +++ b/src/genmeta.rs @@ -0,0 +1,62 @@ +//! Backup generations metadata. + +use crate::schema::{SchemaVersion, VersionComponent}; +use serde::Serialize; +use std::collections::HashMap; + +/// Metadata about the local generation. +#[derive(Debug, Serialize)] +pub struct GenerationMeta { + schema_version: SchemaVersion, + extras: HashMap<String, String>, +} + +impl GenerationMeta { + /// Create from a hash map. + pub fn from(mut map: HashMap<String, String>) -> Result<Self, GenerationMetaError> { + let major: VersionComponent = metaint(&mut map, "schema_version_major")?; + let minor: VersionComponent = metaint(&mut map, "schema_version_minor")?; + Ok(Self { + schema_version: SchemaVersion::new(major, minor), + extras: map, + }) + } + + /// Return schema version of local generation. + pub fn schema_version(&self) -> SchemaVersion { + self.schema_version + } + + /// Get a value corresponding to a key in the meta table. + pub fn get(&self, key: &str) -> Option<&String> { + self.extras.get(key) + } +} + +fn metastr(map: &mut HashMap<String, String>, key: &str) -> Result<String, GenerationMetaError> { + if let Some(v) = map.remove(key) { + Ok(v) + } else { + Err(GenerationMetaError::NoMetaKey(key.to_string())) + } +} + +fn metaint(map: &mut HashMap<String, String>, key: &str) -> Result<u32, GenerationMetaError> { + let v = metastr(map, key)?; + let v = v + .parse() + .map_err(|err| GenerationMetaError::BadMetaInteger(key.to_string(), err))?; + Ok(v) +} + +/// Possible errors from getting generation metadata. +#[derive(Debug, thiserror::Error)] +pub enum GenerationMetaError { + /// Missing from from 'meta' table. + #[error("Generation 'meta' table does not have a row {0}")] + NoMetaKey(String), + + /// Bad data in 'meta' table. + #[error("Generation 'meta' row {0} has badly formed integer: {1}")] + BadMetaInteger(String, std::num::ParseIntError), +} diff --git a/src/index.rs b/src/index.rs index d527839..42f1a95 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1,65 +1,81 @@ +//! An on-disk index of chunks for the server. + use crate::chunkid::ChunkId; use crate::chunkmeta::ChunkMeta; +use crate::label::Label; use rusqlite::Connection; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; +use std::path::Path; -/// A chunk index. +/// A chunk index stored on the disk. /// /// A chunk index lets the server quickly find chunks based on a -/// string key/value pair, or whether they are generations. +/// string key/value pair. #[derive(Debug)] pub struct Index { - filename: PathBuf, conn: Connection, - map: HashMap<(String, String), Vec<ChunkId>>, - generations: Vec<ChunkId>, - metas: HashMap<ChunkId, ChunkMeta>, +} + +/// All the errors that may be returned for `Index`. +#[derive(Debug, thiserror::Error)] +pub enum IndexError { + /// Index does not have a chunk. + #[error("The repository index does not have chunk {0}")] + MissingChunk(ChunkId), + + /// Index has chunk more than once. + #[error("The repository index duplicates chunk {0}")] + DuplicateChunk(ChunkId), + + /// An error from SQLite. + #[error(transparent)] + SqlError(#[from] rusqlite::Error), } impl Index { - pub fn new<P: AsRef<Path>>(dirname: P) -> anyhow::Result<Self> { + /// Create a new index. + pub fn new<P: AsRef<Path>>(dirname: P) -> Result<Self, IndexError> { let filename = dirname.as_ref().join("meta.db"); let conn = if filename.exists() { sql::open_db(&filename)? } else { sql::create_db(&filename)? }; - Ok(Self { - filename, - conn, - map: HashMap::new(), - generations: vec![], - metas: HashMap::new(), - }) + Ok(Self { conn }) } - pub fn insert_meta(&mut self, id: ChunkId, meta: ChunkMeta) -> anyhow::Result<()> { + /// Insert metadata for a new chunk into index. + pub fn insert_meta(&mut self, id: ChunkId, meta: ChunkMeta) -> Result<(), IndexError> { let t = self.conn.transaction()?; sql::insert(&t, &id, &meta)?; t.commit()?; Ok(()) } - pub fn get_meta(&self, id: &ChunkId) -> anyhow::Result<ChunkMeta> { + /// Look up metadata for a chunk, given its id. + pub fn get_meta(&self, id: &ChunkId) -> Result<ChunkMeta, IndexError> { sql::lookup(&self.conn, id) } - pub fn remove_meta(&mut self, id: &ChunkId) -> anyhow::Result<()> { + /// Remove a chunk's metadata. + pub fn remove_meta(&mut self, id: &ChunkId) -> Result<(), IndexError> { sql::remove(&self.conn, id) } - pub fn find_by_sha256(&self, sha256: &str) -> anyhow::Result<Vec<ChunkId>> { - sql::find_by_256(&self.conn, sha256) + /// Find chunks with a client-assigned label. + pub fn find_by_label(&self, label: &str) -> Result<Vec<ChunkId>, IndexError> { + sql::find_by_label(&self.conn, label) } - pub fn find_generations(&self) -> anyhow::Result<Vec<ChunkId>> { - sql::find_generations(&self.conn) + /// Find all chunks. + pub fn all_chunks(&self) -> Result<Vec<ChunkId>, IndexError> { + sql::find_chunk_ids(&self.conn) } } #[cfg(test)] mod test { + use super::Label; + use super::{ChunkId, ChunkMeta, Index}; use std::path::Path; use tempfile::tempdir; @@ -71,140 +87,113 @@ mod test { #[test] fn remembers_inserted() { let id: ChunkId = "id001".parse().unwrap(); - let meta = ChunkMeta::new("abc"); + let sum = Label::sha256(b"abc"); + let meta = ChunkMeta::new(&sum); let dir = tempdir().unwrap(); let mut idx = new_index(dir.path()); idx.insert_meta(id.clone(), meta.clone()).unwrap(); assert_eq!(idx.get_meta(&id).unwrap(), meta); - let ids = idx.find_by_sha256("abc").unwrap(); + let ids = idx.find_by_label(&sum.serialize()).unwrap(); assert_eq!(ids, vec![id]); } #[test] fn does_not_find_uninserted() { let id: ChunkId = "id001".parse().unwrap(); - let meta = ChunkMeta::new("abc"); + let sum = Label::sha256(b"abc"); + let meta = ChunkMeta::new(&sum); let dir = tempdir().unwrap(); let mut idx = new_index(dir.path()); idx.insert_meta(id, meta).unwrap(); - assert_eq!(idx.find_by_sha256("def").unwrap().len(), 0) + assert_eq!(idx.find_by_label("def").unwrap().len(), 0) } #[test] fn removes_inserted() { let id: ChunkId = "id001".parse().unwrap(); - let meta = ChunkMeta::new("abc"); + let sum = Label::sha256(b"abc"); + let meta = ChunkMeta::new(&sum); let dir = tempdir().unwrap(); let mut idx = new_index(dir.path()); idx.insert_meta(id.clone(), meta).unwrap(); idx.remove_meta(&id).unwrap(); - let ids: Vec<ChunkId> = idx.find_by_sha256("abc").unwrap(); + let ids: Vec<ChunkId> = idx.find_by_label(&sum.serialize()).unwrap(); assert_eq!(ids, vec![]); } - - #[test] - fn has_no_generations_initially() { - let dir = tempdir().unwrap(); - let idx = new_index(dir.path()); - assert_eq!(idx.find_generations().unwrap(), vec![]); - } - - #[test] - fn remembers_generation() { - let id: ChunkId = "id001".parse().unwrap(); - let meta = ChunkMeta::new_generation("abc", "timestamp"); - let dir = tempdir().unwrap(); - let mut idx = new_index(dir.path()); - idx.insert_meta(id.clone(), meta.clone()).unwrap(); - assert_eq!(idx.find_generations().unwrap(), vec![id]); - } - - #[test] - fn removes_generation() { - let id: ChunkId = "id001".parse().unwrap(); - let meta = ChunkMeta::new_generation("abc", "timestamp"); - let dir = tempdir().unwrap(); - let mut idx = new_index(dir.path()); - idx.insert_meta(id.clone(), meta.clone()).unwrap(); - idx.remove_meta(&id).unwrap(); - assert_eq!(idx.find_generations().unwrap(), vec![]); - } } mod sql { + use super::{IndexError, Label}; use crate::chunkid::ChunkId; use crate::chunkmeta::ChunkMeta; - use crate::error::ObnamError; use log::error; use rusqlite::{params, Connection, OpenFlags, Row, Transaction}; use std::path::Path; - pub fn create_db(filename: &Path) -> anyhow::Result<Connection> { + /// Create a database in a file. + pub fn create_db(filename: &Path) -> Result<Connection, IndexError> { let flags = OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE; let conn = Connection::open_with_flags(filename, flags)?; conn.execute( - "CREATE TABLE chunks (id TEXT PRIMARY KEY, sha256 TEXT, generation INT, ended TEXT)", + "CREATE TABLE chunks (id TEXT PRIMARY KEY, label TEXT)", params![], )?; - conn.execute("CREATE INDEX sha256_idx ON chunks (sha256)", params![])?; - conn.execute( - "CREATE INDEX generation_idx ON chunks (generation)", - params![], - )?; - conn.pragma_update(None, "journal_mode", &"WAL")?; + conn.execute("CREATE INDEX label_idx ON chunks (label)", params![])?; + conn.pragma_update(None, "journal_mode", "WAL")?; Ok(conn) } - pub fn open_db(filename: &Path) -> anyhow::Result<Connection> { + /// Open an existing database in a file. + pub fn open_db(filename: &Path) -> Result<Connection, IndexError> { let flags = OpenFlags::SQLITE_OPEN_READ_WRITE; let conn = Connection::open_with_flags(filename, flags)?; - conn.pragma_update(None, "journal_mode", &"WAL")?; + conn.pragma_update(None, "journal_mode", "WAL")?; Ok(conn) } - pub fn insert(t: &Transaction, chunkid: &ChunkId, meta: &ChunkMeta) -> anyhow::Result<()> { + /// Insert a new chunk's metadata into database. + pub fn insert(t: &Transaction, chunkid: &ChunkId, meta: &ChunkMeta) -> Result<(), IndexError> { let chunkid = format!("{}", chunkid); - let sha256 = meta.sha256(); - let generation = if meta.is_generation() { 1 } else { 0 }; - let ended = meta.ended(); + let label = meta.label(); t.execute( - "INSERT INTO chunks (id, sha256, generation, ended) VALUES (?1, ?2, ?3, ?4)", - params![chunkid, sha256, generation, ended], + "INSERT INTO chunks (id, label) VALUES (?1, ?2)", + params![chunkid, label], )?; Ok(()) } - pub fn remove(conn: &Connection, chunkid: &ChunkId) -> anyhow::Result<()> { + /// Remove a chunk's metadata from the database. + pub fn remove(conn: &Connection, chunkid: &ChunkId) -> Result<(), IndexError> { conn.execute("DELETE FROM chunks WHERE id IS ?1", params![chunkid])?; Ok(()) } - pub fn lookup(conn: &Connection, id: &ChunkId) -> anyhow::Result<ChunkMeta> { + /// Look up a chunk using its id. + pub fn lookup(conn: &Connection, id: &ChunkId) -> Result<ChunkMeta, IndexError> { let mut stmt = conn.prepare("SELECT * FROM chunks WHERE id IS ?1")?; - let iter = stmt.query_map(params![id], |row| row_to_meta(row))?; + let iter = stmt.query_map(params![id], row_to_meta)?; let mut metas: Vec<ChunkMeta> = vec![]; for meta in iter { let meta = meta?; if metas.is_empty() { - eprintln!("lookup: meta={:?}", meta); metas.push(meta); } else { - let err = ObnamError::DuplicateChunk(id.clone()); + let err = IndexError::DuplicateChunk(id.clone()); error!("{}", err); - return Err(err.into()); + return Err(err); } } - if metas.len() == 0 { - eprintln!("lookup: no hits"); - return Err(ObnamError::MissingChunk(id.clone()).into()); + if metas.is_empty() { + return Err(IndexError::MissingChunk(id.clone())); } let r = metas[0].clone(); Ok(r) } - pub fn find_by_256(conn: &Connection, sha256: &str) -> anyhow::Result<Vec<ChunkId>> { - let mut stmt = conn.prepare("SELECT id FROM chunks WHERE sha256 IS ?1")?; - let iter = stmt.query_map(params![sha256], |row| row_to_id(row))?; + /// Find chunks with a given checksum. + pub fn find_by_label(conn: &Connection, label: &str) -> Result<Vec<ChunkId>, IndexError> { + let mut stmt = conn.prepare("SELECT id FROM chunks WHERE label IS ?1")?; + let iter = stmt.query_map(params![label], row_to_id)?; let mut ids = vec![]; for x in iter { let x = x?; @@ -213,9 +202,10 @@ mod sql { Ok(ids) } - pub fn find_generations(conn: &Connection) -> anyhow::Result<Vec<ChunkId>> { - let mut stmt = conn.prepare("SELECT id FROM chunks WHERE generation IS 1")?; - let iter = stmt.query_map(params![], |row| row_to_id(row))?; + /// Find ids of all chunks. + pub fn find_chunk_ids(conn: &Connection) -> Result<Vec<ChunkId>, IndexError> { + let mut stmt = conn.prepare("SELECT id FROM chunks")?; + let iter = stmt.query_map(params![], row_to_id)?; let mut ids = vec![]; for x in iter { let x = x?; @@ -224,20 +214,14 @@ mod sql { Ok(ids) } - pub fn row_to_meta(row: &Row) -> rusqlite::Result<ChunkMeta> { - let sha256: String = row.get(row.column_index("sha256")?)?; - let generation: i32 = row.get(row.column_index("generation")?)?; - let meta = if generation == 0 { - ChunkMeta::new(&sha256) - } else { - let ended: String = row.get(row.column_index("ended")?)?; - ChunkMeta::new_generation(&sha256, &ended) - }; - Ok(meta) + fn row_to_meta(row: &Row) -> rusqlite::Result<ChunkMeta> { + let hash: String = row.get("label")?; + let sha256 = Label::deserialize(&hash).expect("deserialize checksum from database"); + Ok(ChunkMeta::new(&sha256)) } - pub fn row_to_id(row: &Row) -> rusqlite::Result<ChunkId> { - let id: String = row.get(row.column_index("id")?)?; - Ok(ChunkId::from_str(&id)) + fn row_to_id(row: &Row) -> rusqlite::Result<ChunkId> { + let id: String = row.get("id")?; + Ok(ChunkId::recreate(&id)) } } diff --git a/src/indexedstore.rs b/src/indexedstore.rs deleted file mode 100644 index 0366013..0000000 --- a/src/indexedstore.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::chunk::DataChunk; -use crate::chunkid::ChunkId; -use crate::chunkmeta::ChunkMeta; -use crate::index::Index; -use crate::store::Store; -use std::path::Path; - -/// A store for chunks and their metadata. -/// -/// This combines Store and Index into one interface to make it easier -/// to handle the server side storage of chunks. -pub struct IndexedStore { - store: Store, - index: Index, -} - -impl IndexedStore { - pub fn new(dirname: &Path) -> anyhow::Result<Self> { - let store = Store::new(dirname); - let index = Index::new(dirname)?; - Ok(Self { store, index }) - } - - pub fn save(&mut self, meta: &ChunkMeta, chunk: &DataChunk) -> anyhow::Result<ChunkId> { - let id = ChunkId::new(); - self.store.save(&id, meta, chunk)?; - self.insert_meta(&id, meta)?; - Ok(id) - } - - fn insert_meta(&mut self, id: &ChunkId, meta: &ChunkMeta) -> anyhow::Result<()> { - self.index.insert_meta(id.clone(), meta.clone())?; - Ok(()) - } - - pub fn load(&self, id: &ChunkId) -> anyhow::Result<(DataChunk, ChunkMeta)> { - Ok((self.store.load(id)?, self.load_meta(id)?)) - } - - pub fn load_meta(&self, id: &ChunkId) -> anyhow::Result<ChunkMeta> { - self.index.get_meta(id) - } - - pub fn find_by_sha256(&self, sha256: &str) -> anyhow::Result<Vec<ChunkId>> { - self.index.find_by_sha256(sha256) - } - - pub fn find_generations(&self) -> anyhow::Result<Vec<ChunkId>> { - self.index.find_generations() - } - - pub fn remove(&mut self, id: &ChunkId) -> anyhow::Result<()> { - self.index.remove_meta(id).unwrap(); - self.store.delete(id)?; - Ok(()) - } -} diff --git a/src/label.rs b/src/label.rs new file mode 100644 index 0000000..19d270a --- /dev/null +++ b/src/label.rs @@ -0,0 +1,138 @@ +//! A chunk label. +//! +//! De-duplication of backed up data in Obnam relies on cryptographic +//! checksums. They are implemented in this module. Note that Obnam +//! does not aim to make these algorithms configurable, so only a very +//! small number of carefully chosen algorithms are supported here. + +use blake2::Blake2s256; +use sha2::{Digest, Sha256}; + +const LITERAL: char = '0'; +const SHA256: char = '1'; +const BLAKE2: char = '2'; + +/// A checksum of some data. +#[derive(Debug, Clone)] +pub enum Label { + /// An arbitrary, literal string. + Literal(String), + + /// A SHA256 checksum. + Sha256(String), + + /// A BLAKE2s checksum. + Blake2(String), +} + +impl Label { + /// Construct a literal string. + pub fn literal(s: &str) -> Self { + Self::Literal(s.to_string()) + } + + /// Compute a SHA256 checksum for a block of data. + pub fn sha256(data: &[u8]) -> Self { + let mut hasher = Sha256::new(); + hasher.update(data); + let hash = hasher.finalize(); + Self::Sha256(format!("{:x}", hash)) + } + + /// Compute a BLAKE2s checksum for a block of data. + pub fn blake2(data: &[u8]) -> Self { + let mut hasher = Blake2s256::new(); + hasher.update(data); + let hash = hasher.finalize(); + Self::Sha256(format!("{:x}", hash)) + } + + /// Serialize a label into a string representation. + pub fn serialize(&self) -> String { + match self { + Self::Literal(s) => format!("{}{}", LITERAL, s), + Self::Sha256(hash) => format!("{}{}", SHA256, hash), + Self::Blake2(hash) => format!("{}{}", BLAKE2, hash), + } + } + + /// De-serialize a label from its string representation. + pub fn deserialize(s: &str) -> Result<Self, LabelError> { + if s.starts_with(LITERAL) { + Ok(Self::Literal(s[1..].to_string())) + } else if s.starts_with(SHA256) { + Ok(Self::Sha256(s[1..].to_string())) + } else { + Err(LabelError::UnknownType(s.to_string())) + } + } +} + +/// Kinds of checksum labels. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum LabelChecksumKind { + /// Use a Blake2 checksum. + Blake2, + + /// Use a SHA256 checksum. + Sha256, +} + +impl LabelChecksumKind { + /// Parse a string into a label checksum kind. + pub fn from(s: &str) -> Result<Self, LabelError> { + if s == "sha256" { + Ok(Self::Sha256) + } else if s == "blake2" { + Ok(Self::Blake2) + } else { + Err(LabelError::UnknownType(s.to_string())) + } + } + + /// Serialize a checksum kind into a string. + pub fn serialize(self) -> &'static str { + match self { + Self::Sha256 => "sha256", + Self::Blake2 => "blake2", + } + } +} + +/// Possible errors from dealing with chunk labels. +#[derive(Debug, thiserror::Error)] +pub enum LabelError { + /// Serialized label didn't start with a known type prefix. + #[error("Unknown label: {0:?}")] + UnknownType(String), +} + +#[cfg(test)] +mod test { + use super::{Label, LabelChecksumKind}; + + #[test] + fn roundtrip_literal() { + let label = Label::literal("dummy data"); + let serialized = label.serialize(); + let de = Label::deserialize(&serialized).unwrap(); + let seri2 = de.serialize(); + assert_eq!(serialized, seri2); + } + + #[test] + fn roundtrip_sha256() { + let label = Label::sha256(b"dummy data"); + let serialized = label.serialize(); + let de = Label::deserialize(&serialized).unwrap(); + let seri2 = de.serialize(); + assert_eq!(serialized, seri2); + } + + #[test] + fn roundtrip_checksum_kind() { + for kind in [LabelChecksumKind::Sha256, LabelChecksumKind::Blake2] { + assert_eq!(LabelChecksumKind::from(kind.serialize()).unwrap(), kind); + } + } +} @@ -1,21 +1,38 @@ +//! Encrypted backups. +//! +//! Obnam is a backup program that encrypts the backups. This crate +//! provides access to all the functionality of Obnam as a library. + +#![deny(missing_docs)] + +pub mod accumulated_time; pub mod backup_progress; pub mod backup_reason; pub mod backup_run; -pub mod benchmark; -pub mod checksummer; pub mod chunk; pub mod chunker; pub mod chunkid; pub mod chunkmeta; +pub mod chunkstore; +pub mod cipher; pub mod client; pub mod cmd; +pub mod config; +pub mod db; +pub mod dbgen; +pub mod engine; pub mod error; pub mod fsentry; pub mod fsiter; pub mod generation; pub mod genlist; +pub mod genmeta; pub mod index; -pub mod indexedstore; +pub mod label; +pub mod passwords; +pub mod performance; pub mod policy; +pub mod schema; pub mod server; pub mod store; +pub mod workqueue; diff --git a/src/passwords.rs b/src/passwords.rs new file mode 100644 index 0000000..efc3f96 --- /dev/null +++ b/src/passwords.rs @@ -0,0 +1,102 @@ +//! Passwords for encryption. + +use pbkdf2::{ + password_hash::{PasswordHasher, SaltString}, + Pbkdf2, +}; +use rand::rngs::OsRng; +use serde::{Deserialize, Serialize}; +use std::io::prelude::Write; +use std::os::unix::fs::PermissionsExt; +use std::path::{Path, PathBuf}; + +const KEY_LEN: usize = 32; // Only size accepted by aead crate? + +/// Encryption password. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Passwords { + encryption: String, +} + +impl Passwords { + /// Create a new encryption password from a user-supplied passphrase. + pub fn new(passphrase: &str) -> Self { + let mut key = derive_password(passphrase); + let _ = key.split_off(KEY_LEN); + assert_eq!(key.len(), KEY_LEN); + Self { encryption: key } + } + + /// Get encryption key. + pub fn encryption_key(&self) -> &[u8] { + self.encryption.as_bytes() + } + + /// Load passwords from file. + pub fn load(filename: &Path) -> Result<Self, PasswordError> { + let data = std::fs::read(filename) + .map_err(|err| PasswordError::Read(filename.to_path_buf(), err))?; + serde_yaml::from_slice(&data) + .map_err(|err| PasswordError::Parse(filename.to_path_buf(), err)) + } + + /// Save passwords to file. + pub fn save(&self, filename: &Path) -> Result<(), PasswordError> { + let data = serde_yaml::to_string(&self).map_err(PasswordError::Serialize)?; + + let mut file = std::fs::File::create(filename) + .map_err(|err| PasswordError::Write(filename.to_path_buf(), err))?; + let metadata = file + .metadata() + .map_err(|err| PasswordError::Write(filename.to_path_buf(), err))?; + let mut permissions = metadata.permissions(); + + // Make readadable by owner only. We still have the open file + // handle, so we can write the content. + permissions.set_mode(0o400); + std::fs::set_permissions(filename, permissions) + .map_err(|err| PasswordError::Write(filename.to_path_buf(), err))?; + + // Write actual content. + file.write_all(data.as_bytes()) + .map_err(|err| PasswordError::Write(filename.to_path_buf(), err))?; + + Ok(()) + } +} + +/// Return name of password file, relative to configuration file. +pub fn passwords_filename(config_filename: &Path) -> PathBuf { + let mut filename = config_filename.to_path_buf(); + filename.set_file_name("passwords.yaml"); + filename +} + +fn derive_password(passphrase: &str) -> String { + let salt = SaltString::generate(&mut OsRng); + + Pbkdf2 + .hash_password(passphrase.as_bytes(), salt.as_salt()) + .unwrap() + .to_string() +} + +/// Possible errors from passwords. +#[derive(Debug, thiserror::Error)] +pub enum PasswordError { + /// Failed to make YAML when saving passwords. + #[error("failed to serialize passwords for saving: {0}")] + Serialize(serde_yaml::Error), + + /// Failed to save to file. + #[error("failed to save passwords to {0}: {1}")] + Write(PathBuf, std::io::Error), + + /// Failed read passwords file. + #[error("failed to read passwords from {0}: {1}")] + Read(PathBuf, std::io::Error), + + /// Failed to parse passwords file. + #[error("failed to parse saved passwords from {0}: {1}")] + Parse(PathBuf, serde_yaml::Error), +} diff --git a/src/performance.rs b/src/performance.rs new file mode 100644 index 0000000..29c2328 --- /dev/null +++ b/src/performance.rs @@ -0,0 +1,97 @@ +//! Performance measurements from an Obnam run. + +use crate::accumulated_time::AccumulatedTime; +use log::info; + +/// The kinds of clocks we have. +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +pub enum Clock { + /// The complete runtime of the program. + RunTime, + + /// Time spent downloading previous backup generations. + GenerationDownload, + + /// Time spent uploading backup generations. + GenerationUpload, +} + +/// Collected measurements from this Obnam run. +#[derive(Debug)] +pub struct Performance { + args: Vec<String>, + time: AccumulatedTime<Clock>, + live_files: u64, + files_backed_up: u64, + chunks_uploaded: u64, + chunks_reused: u64, +} + +impl Default for Performance { + fn default() -> Self { + Self { + args: std::env::args().collect(), + time: AccumulatedTime::<Clock>::new(), + live_files: 0, + files_backed_up: 0, + chunks_reused: 0, + chunks_uploaded: 0, + } + } +} + +impl Performance { + /// Log all performance measurements to the log file. + pub fn log(&self) { + info!("Performance measurements for this Obnam run"); + for (i, arg) in self.args.iter().enumerate() { + info!("argv[{}]={:?}", i, arg); + } + info!("Live files found: {}", self.live_files); + info!("Files backed up: {}", self.files_backed_up); + info!("Chunks uploaded: {}", self.chunks_uploaded); + info!("Chunks reused: {}", self.chunks_reused); + info!( + "Downloading previous generation (seconds): {}", + self.time.secs(Clock::GenerationDownload) + ); + info!( + "Uploading new generation (seconds): {}", + self.time.secs(Clock::GenerationUpload) + ); + info!( + "Complete run time (seconds): {}", + self.time.secs(Clock::RunTime) + ); + } + + /// Start a specific clock. + pub fn start(&mut self, clock: Clock) { + self.time.start(clock) + } + + /// Stop a specific clock. + pub fn stop(&mut self, clock: Clock) { + self.time.stop(clock) + } + + /// Increment number of live files. + pub fn found_live_files(&mut self, n: u64) { + self.live_files += n; + } + + /// Increment number of files backed up this run. + pub fn back_up_file(&mut self) { + self.files_backed_up += 1; + } + + /// Increment number of reused chunks. + pub fn reuse_chunk(&mut self) { + self.chunks_reused += 1; + } + + /// Increment number of uploaded chunks. + pub fn upload_chunk(&mut self) { + self.chunks_uploaded += 1; + } +} diff --git a/src/policy.rs b/src/policy.rs index 032b851..8cdbd76 100644 --- a/src/policy.rs +++ b/src/policy.rs @@ -1,24 +1,40 @@ +//! Policy for what gets backed up. + use crate::backup_reason::Reason; use crate::fsentry::FilesystemEntry; use crate::generation::LocalGeneration; -use log::{debug, warn}; +use log::warn; +/// Policy for what gets backed up. +/// +/// The policy allows two aspects to be controlled: +/// +/// * should new files )(files that didn't exist in the previous +/// backup be included in the new backup? +/// * should files that haven't been changed since the previous backup +/// be included in the new backup? +/// +/// If policy doesn't allow a file to be included, it's skipped. pub struct BackupPolicy { new: bool, old_if_changed: bool, } -impl BackupPolicy { - pub fn new() -> Self { +impl Default for BackupPolicy { + /// Create a default policy. + fn default() -> Self { Self { new: true, old_if_changed: true, } } +} +impl BackupPolicy { + /// Does a given file need to be backed up? pub fn needs_backup(&self, old: &LocalGeneration, new_entry: &FilesystemEntry) -> Reason { let new_name = new_entry.pathbuf(); - let reason = match old.get_file(&new_name) { + match old.get_file(&new_name) { Ok(None) => { if self.new { Reason::IsNew @@ -42,14 +58,9 @@ impl BackupPolicy { "needs_backup: lookup in old generation returned error, ignored: {:?}: {}", new_name, err ); - Reason::Error + Reason::GenerationLookupError } - }; - debug!( - "needs_backup: file {:?}: policy decision: {}", - new_name, reason - ); - reason + } } } diff --git a/src/schema.rs b/src/schema.rs new file mode 100644 index 0000000..ae8c00b --- /dev/null +++ b/src/schema.rs @@ -0,0 +1,173 @@ +//! Database schema versions. + +use serde::Serialize; + +/// The type of schema version components. +pub type VersionComponent = u32; + +/// Schema version of the database storing the generation. +/// +/// An Obnam client can restore a generation using schema version +/// (x,y), if the client supports a schema version (x,z). If z < y, +/// the client knows it may not be able to the generation faithfully, +/// and should warn the user about this. If z >= y, the client knows +/// it can restore the generation faithfully. If the client does not +/// support any schema version x, it knows it can't restore the backup +/// at all. +#[derive(Debug, Clone, Copy, Serialize)] +pub struct SchemaVersion { + /// Major version. + pub major: VersionComponent, + /// Minor version. + pub minor: VersionComponent, +} + +impl SchemaVersion { + /// Create a new schema version object. + pub fn new(major: VersionComponent, minor: VersionComponent) -> Self { + Self { major, minor } + } + + /// Return the major and minor version number component of a schema version. + pub fn version(&self) -> (VersionComponent, VersionComponent) { + (self.major, self.minor) + } + + /// Is this schema version compatible with another schema version? + pub fn is_compatible_with(&self, other: &Self) -> bool { + self.major == other.major && self.minor >= other.minor + } +} + +impl std::fmt::Display for SchemaVersion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.major, self.minor) + } +} + +impl std::str::FromStr for SchemaVersion { + type Err = SchemaVersionError; + fn from_str(s: &str) -> Result<Self, Self::Err> { + if let Some(pos) = s.find('.') { + let major = parse_int(&s[..pos])?; + let minor = parse_int(&s[pos + 1..])?; + Ok(SchemaVersion::new(major, minor)) + } else { + Err(SchemaVersionError::Invalid(s.to_string())) + } + } +} + +fn parse_int(s: &str) -> Result<VersionComponent, SchemaVersionError> { + if let Ok(i) = s.parse() { + Ok(i) + } else { + Err(SchemaVersionError::InvalidComponent(s.to_string())) + } +} + +/// Possible errors from parsing schema versions. +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +pub enum SchemaVersionError { + /// Failed to parse a string as a schema version. + #[error("Invalid schema version {0:?}")] + Invalid(String), + + /// Failed to parse a string as a schema version component. + #[error("Invalid schema version component {0:?}")] + InvalidComponent(String), +} + +#[cfg(test)] +mod test { + use super::*; + use std::str::FromStr; + + #[test] + fn from_string() { + let v = SchemaVersion::from_str("1.2").unwrap(); + assert_eq!(v.version(), (1, 2)); + } + + #[test] + fn from_string_fails_if_empty() { + match SchemaVersion::from_str("") { + Err(SchemaVersionError::Invalid(s)) => assert_eq!(s, ""), + _ => unreachable!(), + } + } + + #[test] + fn from_string_fails_if_empty_major() { + match SchemaVersion::from_str(".2") { + Err(SchemaVersionError::InvalidComponent(s)) => assert_eq!(s, ""), + _ => unreachable!(), + } + } + + #[test] + fn from_string_fails_if_empty_minor() { + match SchemaVersion::from_str("1.") { + Err(SchemaVersionError::InvalidComponent(s)) => assert_eq!(s, ""), + _ => unreachable!(), + } + } + + #[test] + fn from_string_fails_if_just_major() { + match SchemaVersion::from_str("1") { + Err(SchemaVersionError::Invalid(s)) => assert_eq!(s, "1"), + _ => unreachable!(), + } + } + + #[test] + fn from_string_fails_if_nonnumeric_major() { + match SchemaVersion::from_str("a.2") { + Err(SchemaVersionError::InvalidComponent(s)) => assert_eq!(s, "a"), + _ => unreachable!(), + } + } + + #[test] + fn from_string_fails_if_nonnumeric_minor() { + match SchemaVersion::from_str("1.a") { + Err(SchemaVersionError::InvalidComponent(s)) => assert_eq!(s, "a"), + _ => unreachable!(), + } + } + + #[test] + fn compatible_with_self() { + let v = SchemaVersion::new(1, 2); + assert!(v.is_compatible_with(&v)); + } + + #[test] + fn compatible_with_older_minor_version() { + let old = SchemaVersion::new(1, 2); + let new = SchemaVersion::new(1, 3); + assert!(new.is_compatible_with(&old)); + } + + #[test] + fn not_compatible_with_newer_minor_version() { + let old = SchemaVersion::new(1, 2); + let new = SchemaVersion::new(1, 3); + assert!(!old.is_compatible_with(&new)); + } + + #[test] + fn not_compatible_with_older_major_version() { + let old = SchemaVersion::new(1, 2); + let new = SchemaVersion::new(2, 0); + assert!(!new.is_compatible_with(&old)); + } + + #[test] + fn not_compatible_with_newer_major_version() { + let old = SchemaVersion::new(1, 2); + let new = SchemaVersion::new(2, 0); + assert!(!old.is_compatible_with(&new)); + } +} diff --git a/src/server.rs b/src/server.rs index 4d5880e..ffd4009 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,9 +1,81 @@ +//! Stuff related to the Obnam chunk server. + use crate::chunk::DataChunk; use crate::chunkid::ChunkId; use crate::chunkmeta::ChunkMeta; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::default::Default; +use std::path::{Path, PathBuf}; + +/// Server configuration. +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +pub struct ServerConfig { + /// Path to directory where chunks are stored. + pub chunks: PathBuf, + /// Address where server is to listen. + pub address: String, + /// Path to TLS key. + pub tls_key: PathBuf, + /// Path to TLS certificate. + pub tls_cert: PathBuf, +} + +/// Possible errors wittht server configuration. +#[derive(Debug, thiserror::Error)] +pub enum ServerConfigError { + /// The chunks directory doesn't exist. + #[error("Directory for chunks {0} does not exist")] + ChunksDirNotFound(PathBuf), + + /// The TLS certificate doesn't exist. + #[error("TLS certificate {0} does not exist")] + TlsCertNotFound(PathBuf), + + /// The TLS key doesn't exist. + #[error("TLS key {0} does not exist")] + TlsKeyNotFound(PathBuf), + + /// Server address is wrong. + #[error("server address can't be resolved")] + BadServerAddress, + + /// Failed to read configuration file. + #[error("failed to read configuration file {0}: {1}")] + Read(PathBuf, std::io::Error), + + /// Failed to parse configuration file as YAML. + #[error("failed to parse configuration file as YAML: {0}")] + YamlParse(serde_yaml::Error), +} + +impl ServerConfig { + /// Read, parse, and check the server configuration file. + pub fn read_config(filename: &Path) -> Result<Self, ServerConfigError> { + let config = match std::fs::read_to_string(filename) { + Ok(config) => config, + Err(err) => return Err(ServerConfigError::Read(filename.to_path_buf(), err)), + }; + let config: Self = serde_yaml::from_str(&config).map_err(ServerConfigError::YamlParse)?; + config.check()?; + Ok(config) + } + + /// Check the configuration. + pub fn check(&self) -> Result<(), ServerConfigError> { + if !self.chunks.exists() { + return Err(ServerConfigError::ChunksDirNotFound(self.chunks.clone())); + } + if !self.tls_cert.exists() { + return Err(ServerConfigError::TlsCertNotFound(self.tls_cert.clone())); + } + if !self.tls_key.exists() { + return Err(ServerConfigError::TlsKeyNotFound(self.tls_key.clone())); + } + Ok(()) + } +} /// Result of creating a chunk. #[derive(Debug, Serialize)] @@ -12,17 +84,18 @@ pub struct Created { } impl Created { + /// Create a new created chunk id. pub fn new(id: ChunkId) -> Self { Created { id } } + /// Convert to JSON. pub fn to_json(&self) -> String { serde_json::to_string(&self).unwrap() } } /// Result of retrieving a chunk. - #[derive(Debug, Serialize)] pub struct Fetched { id: ChunkId, @@ -30,31 +103,36 @@ pub struct Fetched { } impl Fetched { + /// Create a new id for a fetched chunk. pub fn new(id: ChunkId, chunk: DataChunk) -> Self { Fetched { id, chunk } } + /// Convert to JSON. pub fn to_json(&self) -> String { serde_json::to_string(&self).unwrap() } } /// Result of a search. -#[derive(Debug, Default, PartialEq, Deserialize, Serialize)] +#[derive(Debug, Default, Eq, PartialEq, Deserialize, Serialize)] pub struct SearchHits { map: HashMap<String, ChunkMeta>, } impl SearchHits { + /// Insert a new chunk id to search results. pub fn insert(&mut self, id: ChunkId, meta: ChunkMeta) { self.map.insert(id.to_string(), meta); } - pub fn from_json(s: &str) -> anyhow::Result<Self> { + /// Convert from JSON. + pub fn from_json(s: &str) -> Result<Self, serde_json::Error> { let map = serde_json::from_str(s)?; Ok(SearchHits { map }) } + /// Convert to JSON. pub fn to_json(&self) -> String { serde_json::to_string(&self.map).unwrap() } @@ -63,6 +141,7 @@ impl SearchHits { #[cfg(test)] mod test_search_hits { use super::{ChunkMeta, SearchHits}; + use crate::label::Label; #[test] fn no_search_hits() { @@ -73,7 +152,8 @@ mod test_search_hits { #[test] fn one_search_hit() { let id = "abc".parse().unwrap(); - let meta = ChunkMeta::new("123"); + let sum = Label::sha256(b"123"); + let meta = ChunkMeta::new(&sum); let mut hits = SearchHits::default(); hits.insert(id, meta); eprintln!("hits: {:?}", hits); diff --git a/src/store.rs b/src/store.rs index e6cc71f..185370e 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,7 +1,7 @@ +//! Store chunks on-disk on server. + use crate::chunk::DataChunk; use crate::chunkid::ChunkId; -use crate::chunkmeta::ChunkMeta; -use anyhow::Context; use std::path::{Path, PathBuf}; /// Store chunks, with metadata, persistently. @@ -13,6 +13,9 @@ pub struct Store { dir: PathBuf, } +/// An error from a `Store` operation. +pub type StoreError = std::io::Error; + impl Store { /// Create a new Store to represent on-disk storage of chunks.x pub fn new(dir: &Path) -> Self { @@ -38,34 +41,34 @@ impl Store { } /// Save a chunk into a store. - pub fn save(&self, id: &ChunkId, meta: &ChunkMeta, chunk: &DataChunk) -> anyhow::Result<()> { + pub fn save(&self, id: &ChunkId, chunk: &DataChunk) -> Result<(), StoreError> { let (dir, metaname, dataname) = &self.filenames(id); if !dir.exists() { - let res = std::fs::create_dir_all(dir).into(); - if let Err(_) = res { - return res.with_context(|| format!("creating directory {}", dir.display())); - } + std::fs::create_dir_all(dir)?; } - std::fs::write(&metaname, meta.to_json())?; - std::fs::write(&dataname, chunk.data())?; + std::fs::write(metaname, chunk.meta().to_json())?; + std::fs::write(dataname, chunk.data())?; Ok(()) } /// Load a chunk from a store. - pub fn load(&self, id: &ChunkId) -> anyhow::Result<DataChunk> { - let (_, _, dataname) = &self.filenames(id); - let data = std::fs::read(&dataname)?; - let data = DataChunk::new(data); + pub fn load(&self, id: &ChunkId) -> Result<DataChunk, StoreError> { + let (_, metaname, dataname) = &self.filenames(id); + let meta = std::fs::read(metaname)?; + let meta = serde_json::from_slice(&meta)?; + + let data = std::fs::read(dataname)?; + let data = DataChunk::new(data, meta); Ok(data) } /// Delete a chunk from a store. - pub fn delete(&self, id: &ChunkId) -> anyhow::Result<()> { + pub fn delete(&self, id: &ChunkId) -> Result<(), StoreError> { let (_, metaname, dataname) = &self.filenames(id); - std::fs::remove_file(&metaname)?; - std::fs::remove_file(&dataname)?; + std::fs::remove_file(metaname)?; + std::fs::remove_file(dataname)?; Ok(()) } } diff --git a/src/workqueue.rs b/src/workqueue.rs new file mode 100644 index 0000000..6b3ce80 --- /dev/null +++ b/src/workqueue.rs @@ -0,0 +1,62 @@ +//! A queue of work for [`crate::engine::Engine`]. + +use tokio::sync::mpsc; + +/// A queue of work items. +/// +/// An abstraction for producing items of work. For example, chunks of +/// data in a file. The work items are put into an ordered queue to be +/// worked on by another task. The queue is limited in size so that it +/// doesn't grow impossibly large. This acts as a load-limiting +/// synchronizing mechanism. +/// +/// One async task produces work items and puts them into the queue, +/// another consumes them from the queue. If the producer is too fast, +/// the queue fills up, and the producer blocks when putting an item +/// into the queue. If the queue is empty, the consumer blocks until +/// there is something added to the queue. +/// +/// The work items need to be abstracted as a type, and that type is +/// given as a type parameter. +pub struct WorkQueue<T> { + rx: mpsc::Receiver<T>, + tx: Option<mpsc::Sender<T>>, + size: usize, +} + +impl<T> WorkQueue<T> { + /// Create a new work queue of a given maximum size. + pub fn new(queue_size: usize) -> Self { + let (tx, rx) = mpsc::channel(queue_size); + Self { + rx, + tx: Some(tx), + size: queue_size, + } + } + + /// Get maximum size of queue. + pub fn size(&self) -> usize { + self.size + } + + /// Add an item of work to the queue. + pub fn push(&self) -> mpsc::Sender<T> { + self.tx.as_ref().unwrap().clone() + } + + /// Signal that no more work items will be added to the queue. + /// + /// You **must** call this, as otherwise the `next` function will + /// wait indefinitely. + pub fn close(&mut self) { + // println!("Chunkify::close closing sender"); + self.tx = None; + } + + /// Get the oldest work item from the queue, if any. + pub async fn next(&mut self) -> Option<T> { + // println!("next called"); + self.rx.recv().await + } +} |