summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/accumulated_time.rs79
-rw-r--r--src/backup_progress.rs105
-rw-r--r--src/backup_reason.rs36
-rw-r--r--src/backup_run.rs467
-rw-r--r--src/benchmark.rs32
-rw-r--r--src/bin/benchmark-index.rs35
-rw-r--r--src/bin/benchmark-indexedstore.rs28
-rw-r--r--src/bin/benchmark-null.rs29
-rw-r--r--src/bin/benchmark-store.rs28
-rw-r--r--src/bin/obnam-server.rs148
-rw-r--r--src/bin/obnam.rs170
-rw-r--r--src/checksummer.rs8
-rw-r--r--src/chunk.rs165
-rw-r--r--src/chunker.rs58
-rw-r--r--src/chunkid.rs24
-rw-r--r--src/chunkmeta.rs114
-rw-r--r--src/chunkstore.rs307
-rw-r--r--src/cipher.rs249
-rw-r--r--src/client.rs375
-rw-r--r--src/cmd/backup.rs178
-rw-r--r--src/cmd/chunk.rs70
-rw-r--r--src/cmd/chunkify.rs110
-rw-r--r--src/cmd/gen_info.rs47
-rw-r--r--src/cmd/get_chunk.rs37
-rw-r--r--src/cmd/init.rs33
-rw-r--r--src/cmd/inspect.rs46
-rw-r--r--src/cmd/list.rs38
-rw-r--r--src/cmd/list_backup_versions.rs31
-rw-r--r--src/cmd/list_files.rs61
-rw-r--r--src/cmd/mod.rs27
-rw-r--r--src/cmd/resolve.rs44
-rw-r--r--src/cmd/restore.rs275
-rw-r--r--src/cmd/show_config.rs17
-rw-r--r--src/cmd/show_gen.rs123
-rw-r--r--src/config.rs142
-rw-r--r--src/db.rs640
-rw-r--r--src/dbgen.rs768
-rw-r--r--src/engine.rs123
-rw-r--r--src/error.rs106
-rw-r--r--src/fsentry.rs234
-rw-r--r--src/fsiter.rs158
-rw-r--r--src/generation.rs476
-rw-r--r--src/genlist.rs36
-rw-r--r--src/genmeta.rs62
-rw-r--r--src/index.rs188
-rw-r--r--src/indexedstore.rs57
-rw-r--r--src/label.rs138
-rw-r--r--src/lib.rs23
-rw-r--r--src/passwords.rs102
-rw-r--r--src/performance.rs97
-rw-r--r--src/policy.rs33
-rw-r--r--src/schema.rs173
-rw-r--r--src/server.rs88
-rw-r--r--src/store.rs35
-rw-r--r--src/workqueue.rs62
55 files changed, 5942 insertions, 1393 deletions
diff --git a/src/accumulated_time.rs b/src/accumulated_time.rs
new file mode 100644
index 0000000..cdf34b2
--- /dev/null
+++ b/src/accumulated_time.rs
@@ -0,0 +1,79 @@
+//! Measure accumulated time for various operations.
+
+use std::collections::HashMap;
+use std::hash::Hash;
+use std::sync::Mutex;
+use std::time::Instant;
+
+/// Accumulated times for different clocks.
+///
+/// The caller defines a clock type, usually an enum.
+/// `AccumulatedTime` accumulates time for each possible clock.
+/// Conceptually, every type of clock exists. If a type of clock
+/// doesn't ever get created, it measures at 0 accumulated time.
+#[derive(Debug)]
+pub struct AccumulatedTime<T> {
+ accumulated: Mutex<HashMap<T, ClockTime>>,
+}
+
+#[derive(Debug, Default)]
+struct ClockTime {
+ nanos: u128,
+ started: Option<Instant>,
+}
+
+impl<T: Eq + PartialEq + Hash + Copy> AccumulatedTime<T> {
+ /// Create a new accumulated time collector.
+ #[allow(clippy::new_without_default)]
+ pub fn new() -> Self {
+ Self {
+ accumulated: Mutex::new(HashMap::new()),
+ }
+ }
+
+ /// Start a new clock of a given type to measure a span of time.
+ ///
+ /// The clock's measured time is added to the accumulator when the
+ /// clock is stopped.
+ pub fn start(&mut self, clock: T) {
+ let mut map = self.accumulated.lock().unwrap();
+ let ct = map.entry(clock).or_default();
+ assert!(ct.started.is_none());
+ ct.started = Some(Instant::now());
+ }
+
+ /// Stop a running clock.
+ ///
+ /// Its run time is added to the accumulated time for that kind of clock.
+ pub fn stop(&mut self, clock: T) {
+ let mut map = self.accumulated.lock().unwrap();
+ if let Some(ct) = map.get_mut(&clock) {
+ assert!(ct.started.is_some());
+ if let Some(started) = ct.started.take() {
+ ct.nanos += started.elapsed().as_nanos();
+ ct.started = None;
+ }
+ }
+ }
+
+ /// Return the accumulated time for a type of clock, as whole seconds.
+ pub fn secs(&self, clock: T) -> u128 {
+ self.nanos(clock) / 1_000_000_000u128
+ }
+
+ /// Return the accumulated time for a type of clock, as nanoseconds.
+ ///
+ /// This includes the time spent in a currently running clock.
+ pub fn nanos(&self, clock: T) -> u128 {
+ let map = self.accumulated.lock().unwrap();
+ if let Some(ct) = map.get(&clock) {
+ if let Some(started) = ct.started {
+ ct.nanos + started.elapsed().as_nanos()
+ } else {
+ ct.nanos
+ }
+ } else {
+ 0
+ }
+ }
+}
diff --git a/src/backup_progress.rs b/src/backup_progress.rs
index 6c1d3e6..e3995f0 100644
--- a/src/backup_progress.rs
+++ b/src/backup_progress.rs
@@ -1,44 +1,133 @@
+//! Progress bars for Obnam.
+
+use crate::generation::GenId;
use indicatif::{ProgressBar, ProgressStyle};
-use std::path::Path;
+use std::{path::Path, time::Duration};
+
+const SHOW_PROGRESS: bool = true;
+/// A progress bar abstraction specific to backups.
+///
+/// The progress bar is different for initial and incremental backups,
+/// and for different phases of making a backup.
pub struct BackupProgress {
progress: ProgressBar,
}
impl BackupProgress {
- pub fn new() -> Self {
- let progress = if true {
+ /// Create a progress bar for an initial backup.
+ pub fn initial() -> Self {
+ let progress = if SHOW_PROGRESS {
ProgressBar::new(0)
} else {
ProgressBar::hidden()
};
- let parts = vec![
+ let parts = [
+ "initial backup",
+ "elapsed: {elapsed}",
+ "files: {pos}",
+ "current: {wide_msg}",
+ "{spinner}",
+ ];
+ progress.set_style(
+ ProgressStyle::default_bar()
+ .template(&parts.join("\n"))
+ .expect("create indicatif ProgressStyle value"),
+ );
+ progress.enable_steady_tick(Duration::from_millis(100));
+
+ Self { progress }
+ }
+
+ /// Create a progress bar for an incremental backup.
+ pub fn incremental() -> Self {
+ let progress = if SHOW_PROGRESS {
+ ProgressBar::new(0)
+ } else {
+ ProgressBar::hidden()
+ };
+ let parts = [
+ "incremental backup",
"{wide_bar}",
"elapsed: {elapsed}",
"files: {pos}/{len}",
"current: {wide_msg}",
"{spinner}",
];
- progress.set_style(ProgressStyle::default_bar().template(&parts.join("\n")));
- progress.enable_steady_tick(100);
+ progress.set_style(
+ ProgressStyle::default_bar()
+ .template(&parts.join("\n"))
+ .expect("create indicatif ProgressStyle value"),
+ );
+ progress.enable_steady_tick(Duration::from_millis(100));
+
+ Self { progress }
+ }
+
+ /// Create a progress bar for uploading a new generation's metadata.
+ pub fn upload_generation() -> Self {
+ let progress = ProgressBar::new(0);
+ let parts = [
+ "uploading new generation metadata",
+ "elapsed: {elapsed}",
+ "{spinner}",
+ ];
+ progress.set_style(
+ ProgressStyle::default_bar()
+ .template(&parts.join("\n"))
+ .expect("create indicatif ProgressStyle value"),
+ );
+ progress.enable_steady_tick(Duration::from_millis(100));
+
+ Self { progress }
+ }
+
+ /// Create a progress bar for downloading an existing generation's
+ /// metadata.
+ pub fn download_generation(gen_id: &GenId) -> Self {
+ let progress = ProgressBar::new(0);
+ let parts = ["{msg}", "elapsed: {elapsed}", "{spinner}"];
+ progress.set_style(
+ ProgressStyle::default_bar()
+ .template(&parts.join("\n"))
+ .expect("create indicatif ProgressStyle value"),
+ );
+ progress.enable_steady_tick(Duration::from_millis(100));
+ progress.set_message(format!(
+ "downloading previous generation metadata: {}",
+ gen_id
+ ));
Self { progress }
}
+ /// Set the number of files that were in the previous generation.
+ ///
+ /// The new generation usually has about the same number of files,
+ /// so the progress bar can show progress for incremental backups
+ /// without having to count all the files that actually exist first.
pub fn files_in_previous_generation(&self, count: u64) {
self.progress.set_length(count);
}
+ /// Update progress bar about number of problems found during a backup.
pub fn found_problem(&self) {
self.progress.inc(1);
}
+ /// Update progress bar about number of actual files found.
pub fn found_live_file(&self, filename: &Path) {
self.progress.inc(1);
- self.progress
- .set_message(&format!("{}", filename.display()));
+ if self.progress.length() < Some(self.progress.position()) {
+ self.progress.set_length(self.progress.position());
+ }
+ self.progress.set_message(format!("{}", filename.display()));
}
+ /// Tell progress bar it's finished.
+ ///
+ /// This will remove all traces of the progress bar from the
+ /// screen.
pub fn finish(&self) {
self.progress.set_length(self.progress.position());
self.progress.finish_and_clear();
diff --git a/src/backup_reason.rs b/src/backup_reason.rs
index 218857c..9a17d80 100644
--- a/src/backup_reason.rs
+++ b/src/backup_reason.rs
@@ -1,29 +1,54 @@
+//! Why was a file backed up?
+
use rusqlite::types::ToSqlOutput;
use rusqlite::ToSql;
use std::fmt;
+/// Represent the reason a file is in a backup.
#[derive(Debug, Copy, Clone)]
pub enum Reason {
+ /// File was skipped due to policy, but carried over without
+ /// changes.
Skipped,
+ /// File is new, compared to previous backup.
IsNew,
+ /// File has been changed, compared to previous backup,
Changed,
+ /// File has not been changed, compared to previous backup,
Unchanged,
- Error,
+ /// There was an error looking up the file in the previous backup.
+ ///
+ /// File has been carried over without changes.
+ GenerationLookupError,
+ /// The was an error backing up the file.
+ ///
+ /// File has been carried over without changes.
+ FileError,
+ /// Reason is unknown.
+ ///
+ /// The previous backup had a reason that the current version of
+ /// Obnam doesn't recognize. The file has been carried over
+ /// without changes.
+ Unknown,
}
impl Reason {
- pub fn from_str(text: &str) -> Reason {
+ /// Create a Reason from a string representation.
+ pub fn from(text: &str) -> Reason {
match text {
"skipped" => Reason::Skipped,
"new" => Reason::IsNew,
"changed" => Reason::Changed,
"unchanged" => Reason::Unchanged,
- _ => Reason::Error,
+ "genlookuperror" => Reason::GenerationLookupError,
+ "fileerror" => Reason::FileError,
+ _ => Reason::Unknown,
}
}
}
impl ToSql for Reason {
+ /// Represent Reason as text for SQL.
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput> {
Ok(ToSqlOutput::Owned(rusqlite::types::Value::Text(format!(
"{}",
@@ -33,13 +58,16 @@ impl ToSql for Reason {
}
impl fmt::Display for Reason {
+ /// Represent Reason for display.
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let reason = match self {
Reason::Skipped => "skipped",
Reason::IsNew => "new",
Reason::Changed => "changed",
Reason::Unchanged => "unchanged",
- Reason::Error => "error",
+ Reason::GenerationLookupError => "genlookuperror",
+ Reason::FileError => "fileerror",
+ Reason::Unknown => "unknown",
};
write!(f, "{}", reason)
}
diff --git a/src/backup_run.rs b/src/backup_run.rs
index e3bfc5a..372ef65 100644
--- a/src/backup_run.rs
+++ b/src/backup_run.rs
@@ -1,92 +1,437 @@
+//! Run one backup.
+
use crate::backup_progress::BackupProgress;
use crate::backup_reason::Reason;
+use crate::chunk::{GenerationChunk, GenerationChunkError};
+use crate::chunker::{ChunkerError, FileChunks};
use crate::chunkid::ChunkId;
-use crate::client::{BackupClient, ClientConfig};
-use crate::fsentry::FilesystemEntry;
-use crate::generation::LocalGeneration;
+use crate::client::{BackupClient, ClientError};
+use crate::config::ClientConfig;
+use crate::db::DatabaseError;
+use crate::dbgen::{schema_version, FileId, DEFAULT_SCHEMA_MAJOR};
+use crate::error::ObnamError;
+use crate::fsentry::{FilesystemEntry, FilesystemKind};
+use crate::fsiter::{AnnotatedFsEntry, FsIterError, FsIterator};
+use crate::generation::{
+ GenId, LocalGeneration, LocalGenerationError, NascentError, NascentGeneration,
+};
+use crate::label::LabelChecksumKind;
+use crate::performance::{Clock, Performance};
use crate::policy::BackupPolicy;
-use log::{info, warn};
+use crate::schema::SchemaVersion;
+
+use bytesize::MIB;
+use chrono::{DateTime, Local};
+use log::{debug, error, info, warn};
+use std::path::{Path, PathBuf};
+
+const DEFAULT_CHECKSUM_KIND: LabelChecksumKind = LabelChecksumKind::Sha256;
+const SQLITE_CHUNK_SIZE: usize = MIB as usize;
-pub struct BackupRun {
- client: BackupClient,
+/// A running backup.
+pub struct BackupRun<'a> {
+ checksum_kind: Option<LabelChecksumKind>,
+ client: &'a mut BackupClient,
policy: BackupPolicy,
buffer_size: usize,
- progress: BackupProgress,
+ progress: Option<BackupProgress>,
+}
+
+/// Possible errors that can occur during a backup.
+#[derive(Debug, thiserror::Error)]
+pub enum BackupError {
+ /// An error from communicating with the server.
+ #[error(transparent)]
+ ClientError(#[from] ClientError),
+
+ /// An error iterating over a directory tree.
+ #[error(transparent)]
+ FsIterError(#[from] FsIterError),
+
+ /// An error from creating a new backup's metadata.
+ #[error(transparent)]
+ NascentError(#[from] NascentError),
+
+ /// An error using an existing backup's metadata.
+ #[error(transparent)]
+ LocalGenerationError(#[from] LocalGenerationError),
+
+ /// An error using a Database.
+ #[error(transparent)]
+ Database(#[from] DatabaseError),
+
+ /// An error splitting data into chunks.
+ #[error(transparent)]
+ ChunkerError(#[from] ChunkerError),
+
+ /// A error splitting backup metadata into chunks.
+ #[error(transparent)]
+ GenerationChunkError(#[from] GenerationChunkError),
+}
+
+/// The outcome of backing up a file system entry.
+#[derive(Debug)]
+pub struct FsEntryBackupOutcome {
+ /// The file system entry.
+ pub entry: FilesystemEntry,
+ /// The chunk identifiers for the file's content.
+ pub ids: Vec<ChunkId>,
+ /// Why this entry is added to the new backup.
+ pub reason: Reason,
+ /// Does this entry represent a cache directory?
+ pub is_cachedir_tag: bool,
+}
+
+/// The outcome of backing up a backup root.
+#[derive(Debug)]
+struct OneRootBackupOutcome {
+ /// Any warnings (non-fatal errors) from backing up the backup root.
+ pub warnings: Vec<BackupError>,
+ /// New cache directories in this root.
+ pub new_cachedir_tags: Vec<PathBuf>,
}
-impl BackupRun {
- pub fn new(config: &ClientConfig, buffer_size: usize) -> anyhow::Result<Self> {
- let client = BackupClient::new(&config.server_url)?;
- let policy = BackupPolicy::new();
- let progress = BackupProgress::new();
+/// The outcome of a backup run.
+#[derive(Debug)]
+pub struct RootsBackupOutcome {
+ /// The number of backed up files.
+ pub files_count: FileId,
+ /// The errors encountered while backing up files.
+ pub warnings: Vec<BackupError>,
+ /// CACHEDIR.TAG files that aren't present in in a previous generation.
+ pub new_cachedir_tags: Vec<PathBuf>,
+ /// Id of new generation.
+ pub gen_id: GenId,
+}
+
+impl<'a> BackupRun<'a> {
+ /// Create a new run for an initial backup.
+ pub fn initial(
+ config: &ClientConfig,
+ client: &'a mut BackupClient,
+ ) -> Result<Self, BackupError> {
Ok(Self {
+ checksum_kind: Some(DEFAULT_CHECKSUM_KIND),
client,
- policy,
- buffer_size,
- progress,
+ policy: BackupPolicy::default(),
+ buffer_size: config.chunk_size,
+ progress: Some(BackupProgress::initial()),
})
}
- pub fn client(&self) -> &BackupClient {
- &self.client
+ /// Create a new run for an incremental backup.
+ pub fn incremental(
+ config: &ClientConfig,
+ client: &'a mut BackupClient,
+ ) -> Result<Self, BackupError> {
+ Ok(Self {
+ checksum_kind: None,
+ client,
+ policy: BackupPolicy::default(),
+ buffer_size: config.chunk_size,
+ progress: None,
+ })
}
- pub fn progress(&self) -> &BackupProgress {
- &self.progress
- }
+ /// Start the backup run.
+ pub async fn start(
+ &mut self,
+ genid: Option<&GenId>,
+ oldname: &Path,
+ perf: &mut Performance,
+ ) -> Result<LocalGeneration, ObnamError> {
+ match genid {
+ None => {
+ // Create a new, empty generation.
+ let schema = schema_version(DEFAULT_SCHEMA_MAJOR).unwrap();
+ NascentGeneration::create(oldname, schema, self.checksum_kind.unwrap())?.close()?;
- pub fn backup_file_initially(
- &self,
- entry: anyhow::Result<FilesystemEntry>,
- ) -> anyhow::Result<(FilesystemEntry, Vec<ChunkId>, Reason)> {
- match entry {
- Err(err) => Err(err.into()),
- Ok(entry) => {
- let path = &entry.pathbuf();
- info!("backup: {}", path.display());
- self.progress.found_live_file(path);
- let ids = self
- .client
- .upload_filesystem_entry(&entry, self.buffer_size)?;
- Ok((entry.clone(), ids, Reason::IsNew))
+ // Open the newly created empty generation.
+ Ok(LocalGeneration::open(oldname)?)
+ }
+ Some(genid) => {
+ perf.start(Clock::GenerationDownload);
+ let old = self.fetch_previous_generation(genid, oldname).await?;
+ perf.stop(Clock::GenerationDownload);
+
+ let meta = old.meta()?;
+ if let Some(v) = meta.get("checksum_kind") {
+ self.checksum_kind = Some(LabelChecksumKind::from(v)?);
+ }
+
+ let progress = BackupProgress::incremental();
+ progress.files_in_previous_generation(old.file_count()? as u64);
+ self.progress = Some(progress);
+
+ Ok(old)
}
}
}
- pub fn backup_file_incrementally(
+ fn checksum_kind(&self) -> LabelChecksumKind {
+ self.checksum_kind.unwrap_or(LabelChecksumKind::Sha256)
+ }
+
+ async fn fetch_previous_generation(
&self,
- entry: anyhow::Result<FilesystemEntry>,
+ genid: &GenId,
+ oldname: &Path,
+ ) -> Result<LocalGeneration, ObnamError> {
+ let progress = BackupProgress::download_generation(genid);
+ let old = self.client.fetch_generation(genid, oldname).await?;
+ progress.finish();
+ Ok(old)
+ }
+
+ /// Finish this backup run.
+ pub fn finish(&self) {
+ if let Some(progress) = &self.progress {
+ progress.finish();
+ }
+ }
+
+ /// Back up all the roots for this run.
+ pub async fn backup_roots(
+ &mut self,
+ config: &ClientConfig,
old: &LocalGeneration,
- ) -> anyhow::Result<(FilesystemEntry, Vec<ChunkId>, Reason)> {
- match entry {
- Err(err) => {
- warn!("backup: {}", err);
- self.progress.found_problem();
- Err(err)
+ newpath: &Path,
+ schema: SchemaVersion,
+ perf: &mut Performance,
+ ) -> Result<RootsBackupOutcome, ObnamError> {
+ let mut warnings: Vec<BackupError> = vec![];
+ let mut new_cachedir_tags = vec![];
+ let files_count = {
+ let mut new = NascentGeneration::create(newpath, schema, self.checksum_kind.unwrap())?;
+ for root in &config.roots {
+ match self.backup_one_root(config, old, &mut new, root).await {
+ Ok(mut o) => {
+ new_cachedir_tags.append(&mut o.new_cachedir_tags);
+ if !o.warnings.is_empty() {
+ for err in o.warnings.iter() {
+ debug!("ignoring backup error {}", err);
+ self.found_problem();
+ }
+ warnings.append(&mut o.warnings);
+ }
+ }
+ Err(err) => {
+ self.found_problem();
+ return Err(err.into());
+ }
+ }
}
- Ok(entry) => {
- let path = &entry.pathbuf();
- info!("backup: {}", path.display());
- self.progress.found_live_file(path);
- let reason = self.policy.needs_backup(&old, &entry);
- match reason {
- Reason::IsNew | Reason::Changed | Reason::Error => {
- let ids = self
- .client
- .upload_filesystem_entry(&entry, self.buffer_size)?;
- Ok((entry.clone(), ids, reason))
+ let count = new.file_count();
+ new.close()?;
+ count
+ };
+ self.finish();
+ perf.start(Clock::GenerationUpload);
+ let gen_id = self.upload_nascent_generation(newpath).await?;
+ perf.stop(Clock::GenerationUpload);
+ let gen_id = GenId::from_chunk_id(gen_id);
+ Ok(RootsBackupOutcome {
+ files_count,
+ warnings,
+ new_cachedir_tags,
+ gen_id,
+ })
+ }
+
+ async fn backup_one_root(
+ &mut self,
+ config: &ClientConfig,
+ old: &LocalGeneration,
+ new: &mut NascentGeneration,
+ root: &Path,
+ ) -> Result<OneRootBackupOutcome, NascentError> {
+ let mut warnings: Vec<BackupError> = vec![];
+ let mut new_cachedir_tags = vec![];
+ let iter = FsIterator::new(root, config.exclude_cache_tag_directories);
+ let mut first_entry = true;
+ for entry in iter {
+ match entry {
+ Err(err) => {
+ if first_entry {
+ // Only the first entry (the backup root)
+ // failing is an error. Everything else is a
+ // warning.
+ return Err(NascentError::BackupRootFailed(root.to_path_buf(), err));
+ }
+ warnings.push(err.into());
+ }
+ Ok(entry) => {
+ let path = entry.inner.pathbuf();
+ if entry.is_cachedir_tag && !old.is_cachedir_tag(&path)? {
+ new_cachedir_tags.push(path);
}
- Reason::Unchanged | Reason::Skipped => {
- let fileno = old.get_fileno(&entry.pathbuf())?;
- let ids = if let Some(fileno) = fileno {
- old.chunkids(fileno)?
- } else {
- vec![]
- };
- Ok((entry.clone(), ids, reason))
+ match self.backup_if_needed(entry, old).await {
+ Err(err) => {
+ warnings.push(err);
+ }
+ Ok(None) => (),
+ Ok(Some(o)) => {
+ if let Err(err) =
+ new.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag)
+ {
+ warnings.push(err.into());
+ }
+ }
}
}
}
+ first_entry = false;
+ }
+
+ Ok(OneRootBackupOutcome {
+ warnings,
+ new_cachedir_tags,
+ })
+ }
+
+ async fn backup_if_needed(
+ &mut self,
+ entry: AnnotatedFsEntry,
+ old: &LocalGeneration,
+ ) -> Result<Option<FsEntryBackupOutcome>, BackupError> {
+ let path = &entry.inner.pathbuf();
+ info!("backup: {}", path.display());
+ self.found_live_file(path);
+ let reason = self.policy.needs_backup(old, &entry.inner);
+ match reason {
+ Reason::IsNew | Reason::Changed | Reason::GenerationLookupError | Reason::Unknown => {
+ Ok(Some(self.backup_one_entry(&entry, path, reason).await))
+ }
+ Reason::Skipped => Ok(None),
+ Reason::Unchanged | Reason::FileError => {
+ let fileno = old.get_fileno(&entry.inner.pathbuf())?;
+ let ids = if let Some(fileno) = fileno {
+ let mut ids = vec![];
+ for id in old.chunkids(fileno)?.iter()? {
+ ids.push(id?);
+ }
+ ids
+ } else {
+ vec![]
+ };
+ Ok(Some(FsEntryBackupOutcome {
+ entry: entry.inner,
+ ids,
+ reason,
+ is_cachedir_tag: entry.is_cachedir_tag,
+ }))
+ }
+ }
+ }
+
+ async fn backup_one_entry(
+ &mut self,
+ entry: &AnnotatedFsEntry,
+ path: &Path,
+ reason: Reason,
+ ) -> FsEntryBackupOutcome {
+ let ids = self
+ .upload_filesystem_entry(&entry.inner, self.buffer_size)
+ .await;
+ match ids {
+ Err(err) => {
+ warn!("error backing up {}, skipping it: {}", path.display(), err);
+ FsEntryBackupOutcome {
+ entry: entry.inner.clone(),
+ ids: vec![],
+ reason: Reason::FileError,
+ is_cachedir_tag: entry.is_cachedir_tag,
+ }
+ }
+ Ok(ids) => FsEntryBackupOutcome {
+ entry: entry.inner.clone(),
+ ids,
+ reason,
+ is_cachedir_tag: entry.is_cachedir_tag,
+ },
+ }
+ }
+
+ /// Upload any file content for a file system entry.
+ pub async fn upload_filesystem_entry(
+ &mut self,
+ e: &FilesystemEntry,
+ size: usize,
+ ) -> Result<Vec<ChunkId>, BackupError> {
+ let path = e.pathbuf();
+ info!("uploading {:?}", path);
+ let ids = match e.kind() {
+ FilesystemKind::Regular => self.upload_regular_file(&path, size).await?,
+ FilesystemKind::Directory => vec![],
+ FilesystemKind::Symlink => vec![],
+ FilesystemKind::Socket => vec![],
+ FilesystemKind::Fifo => vec![],
+ };
+ info!("upload OK for {:?}", path);
+ Ok(ids)
+ }
+
+ /// Upload the metadata for the backup of this run.
+ pub async fn upload_generation(
+ &mut self,
+ filename: &Path,
+ size: usize,
+ ) -> Result<ChunkId, BackupError> {
+ info!("upload SQLite {}", filename.display());
+ let ids = self.upload_regular_file(filename, size).await?;
+ let gen = GenerationChunk::new(ids);
+ let data = gen.to_data_chunk()?;
+ let gen_id = self.client.upload_chunk(data).await?;
+ info!("uploaded generation {}", gen_id);
+ Ok(gen_id)
+ }
+
+ async fn upload_regular_file(
+ &mut self,
+ filename: &Path,
+ size: usize,
+ ) -> Result<Vec<ChunkId>, BackupError> {
+ info!("upload file {}", filename.display());
+ let mut chunk_ids = vec![];
+ let file = std::fs::File::open(filename)
+ .map_err(|err| ClientError::FileOpen(filename.to_path_buf(), err))?;
+ let chunker = FileChunks::new(size, file, filename, self.checksum_kind());
+ for item in chunker {
+ let chunk = item?;
+ if let Some(chunk_id) = self.client.has_chunk(chunk.meta()).await? {
+ chunk_ids.push(chunk_id.clone());
+ info!("reusing existing chunk {}", chunk_id);
+ } else {
+ let chunk_id = self.client.upload_chunk(chunk).await?;
+ chunk_ids.push(chunk_id.clone());
+ info!("created new chunk {}", chunk_id);
+ }
}
+ Ok(chunk_ids)
}
+
+ async fn upload_nascent_generation(&mut self, filename: &Path) -> Result<ChunkId, ObnamError> {
+ let progress = BackupProgress::upload_generation();
+ let gen_id = self.upload_generation(filename, SQLITE_CHUNK_SIZE).await?;
+ progress.finish();
+ Ok(gen_id)
+ }
+
+ fn found_live_file(&self, path: &Path) {
+ if let Some(progress) = &self.progress {
+ progress.found_live_file(path);
+ }
+ }
+
+ fn found_problem(&self) {
+ if let Some(progress) = &self.progress {
+ progress.found_problem();
+ }
+ }
+}
+
+/// Current timestamp as an ISO 8601 string.
+pub fn current_timestamp() -> String {
+ let now: DateTime<Local> = Local::now();
+ format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z"))
}
diff --git a/src/benchmark.rs b/src/benchmark.rs
deleted file mode 100644
index b313868..0000000
--- a/src/benchmark.rs
+++ /dev/null
@@ -1,32 +0,0 @@
-use crate::chunk::DataChunk;
-use crate::chunkid::ChunkId;
-use crate::chunkmeta::ChunkMeta;
-
-// Generate a desired number of empty data chunks with id and metadata.
-pub struct ChunkGenerator {
- goal: u32,
- next: u32,
-}
-
-impl ChunkGenerator {
- pub fn new(goal: u32) -> Self {
- Self { goal, next: 0 }
- }
-}
-
-impl Iterator for ChunkGenerator {
- type Item = (ChunkId, String, ChunkMeta, DataChunk);
-
- fn next(&mut self) -> Option<Self::Item> {
- if self.next >= self.goal {
- None
- } else {
- let id = ChunkId::new();
- let checksum = id.sha256();
- let meta = ChunkMeta::new(&checksum);
- let chunk = DataChunk::new(vec![]);
- self.next += 1;
- Some((id, checksum, meta, chunk))
- }
- }
-}
diff --git a/src/bin/benchmark-index.rs b/src/bin/benchmark-index.rs
deleted file mode 100644
index d49a6c3..0000000
--- a/src/bin/benchmark-index.rs
+++ /dev/null
@@ -1,35 +0,0 @@
-use obnam::benchmark::ChunkGenerator;
-use obnam::chunkmeta::ChunkMeta;
-use obnam::index::Index;
-use std::path::PathBuf;
-use structopt::StructOpt;
-
-#[derive(Debug, StructOpt)]
-#[structopt(
- name = "benchmark-index",
- about = "Benhcmark the store index in memory"
-)]
-struct Opt {
- // We don't use this, but we accept it for command line
- // compatibility with other benchmark programs.
- #[structopt(parse(from_os_str))]
- chunks: PathBuf,
-
- #[structopt()]
- num: u32,
-}
-
-fn main() -> anyhow::Result<()> {
- pretty_env_logger::init();
-
- let opt = Opt::from_args();
- let gen = ChunkGenerator::new(opt.num);
-
- let mut index = Index::new(".")?;
- for (id, checksum, _, _) in gen {
- let meta = ChunkMeta::new(&checksum);
- index.insert_meta(id, meta)?;
- }
-
- Ok(())
-}
diff --git a/src/bin/benchmark-indexedstore.rs b/src/bin/benchmark-indexedstore.rs
deleted file mode 100644
index 3ee4c38..0000000
--- a/src/bin/benchmark-indexedstore.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-use obnam::benchmark::ChunkGenerator;
-use obnam::indexedstore::IndexedStore;
-use std::path::PathBuf;
-use structopt::StructOpt;
-
-#[derive(Debug, StructOpt)]
-#[structopt(name = "benchmark-store", about = "Benhcmark the store without HTTP")]
-struct Opt {
- #[structopt(parse(from_os_str))]
- chunks: PathBuf,
-
- #[structopt()]
- num: u32,
-}
-
-fn main() -> anyhow::Result<()> {
- pretty_env_logger::init();
-
- let opt = Opt::from_args();
- let gen = ChunkGenerator::new(opt.num);
-
- let mut store = IndexedStore::new(&opt.chunks)?;
- for (_, _, meta, chunk) in gen {
- store.save(&meta, &chunk)?;
- }
-
- Ok(())
-}
diff --git a/src/bin/benchmark-null.rs b/src/bin/benchmark-null.rs
deleted file mode 100644
index 6df8ca1..0000000
--- a/src/bin/benchmark-null.rs
+++ /dev/null
@@ -1,29 +0,0 @@
-use obnam::benchmark::ChunkGenerator;
-use std::path::PathBuf;
-use structopt::StructOpt;
-
-#[derive(Debug, StructOpt)]
-#[structopt(
- name = "benchmark-index",
- about = "Benhcmark the store index in memory"
-)]
-struct Opt {
- // We don't use this, but we accept it for command line
- // compatibility with other benchmark programs.
- #[structopt(parse(from_os_str))]
- chunks: PathBuf,
-
- #[structopt()]
- num: u32,
-}
-
-fn main() -> anyhow::Result<()> {
- pretty_env_logger::init();
-
- let opt = Opt::from_args();
- let gen = ChunkGenerator::new(opt.num);
-
- for (_, _, _, _) in gen {}
-
- Ok(())
-}
diff --git a/src/bin/benchmark-store.rs b/src/bin/benchmark-store.rs
deleted file mode 100644
index f7c82b1..0000000
--- a/src/bin/benchmark-store.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-use obnam::benchmark::ChunkGenerator;
-use obnam::store::Store;
-use std::path::PathBuf;
-use structopt::StructOpt;
-
-#[derive(Debug, StructOpt)]
-#[structopt(name = "benchmark-store", about = "Benhcmark the store without HTTP")]
-struct Opt {
- #[structopt(parse(from_os_str))]
- chunks: PathBuf,
-
- #[structopt()]
- num: u32,
-}
-
-fn main() -> anyhow::Result<()> {
- pretty_env_logger::init();
-
- let opt = Opt::from_args();
- let gen = ChunkGenerator::new(opt.num);
-
- let store = Store::new(&opt.chunks);
- for (id, _, meta, chunk) in gen {
- store.save(&id, &meta, &chunk)?;
- }
-
- Ok(())
-}
diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs
index 19f2e99..9b5a557 100644
--- a/src/bin/obnam-server.rs
+++ b/src/bin/obnam-server.rs
@@ -1,42 +1,43 @@
-use bytes::Bytes;
+use anyhow::Context;
+use clap::Parser;
use log::{debug, error, info};
-use obnam::chunk::DataChunk;
use obnam::chunkid::ChunkId;
use obnam::chunkmeta::ChunkMeta;
-use obnam::indexedstore::IndexedStore;
-use serde::{Deserialize, Serialize};
+use obnam::chunkstore::ChunkStore;
+use obnam::label::Label;
+use obnam::server::{ServerConfig, ServerConfigError};
+use serde::Serialize;
use std::collections::HashMap;
use std::default::Default;
use std::net::{SocketAddr, ToSocketAddrs};
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use structopt::StructOpt;
use tokio::sync::Mutex;
use warp::http::StatusCode;
+use warp::hyper::body::Bytes;
use warp::Filter;
-#[derive(Debug, StructOpt)]
-#[structopt(name = "obnam2-server", about = "Backup server")]
+#[derive(Debug, Parser)]
+#[clap(name = "obnam2-server", about = "Backup server")]
struct Opt {
- #[structopt(parse(from_os_str))]
config: PathBuf,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
- pretty_env_logger::init();
+ pretty_env_logger::init_custom_env("OBNAM_SERVER_LOG");
- let opt = Opt::from_args();
- let config = Config::read_config(&opt.config).unwrap();
+ let opt = Opt::parse();
+ let config = load_config(&opt.config)?;
let addresses: Vec<SocketAddr> = config.address.to_socket_addrs()?.collect();
if addresses.is_empty() {
error!("specified address is empty set: {:?}", addresses);
eprintln!("ERROR: server address is empty: {:?}", addresses);
- return Err(ConfigError::BadServerAddress.into());
+ return Err(ServerConfigError::BadServerAddress.into());
}
- let store = IndexedStore::new(&config.chunks)?;
+ let store = ChunkStore::local(&config.chunks)?;
let store = Arc::new(Mutex::new(store));
let store = warp::any().map(move || Arc::clone(&store));
@@ -45,32 +46,32 @@ async fn main() -> anyhow::Result<()> {
debug!("Configuration: {:#?}", config);
let create = warp::post()
+ .and(warp::path("v1"))
.and(warp::path("chunks"))
+ .and(warp::path::end())
.and(store.clone())
.and(warp::header("chunk-meta"))
.and(warp::filters::body::bytes())
.and_then(create_chunk);
let fetch = warp::get()
+ .and(warp::path("v1"))
.and(warp::path("chunks"))
.and(warp::path::param())
+ .and(warp::path::end())
.and(store.clone())
.and_then(fetch_chunk);
let search = warp::get()
+ .and(warp::path("v1"))
.and(warp::path("chunks"))
+ .and(warp::path::end())
.and(warp::query::<HashMap<String, String>>())
.and(store.clone())
.and_then(search_chunks);
- let delete = warp::delete()
- .and(warp::path("chunks"))
- .and(warp::path::param())
- .and(store.clone())
- .and_then(delete_chunk);
-
let log = warp::log("obnam");
- let webroot = create.or(fetch).or(search).or(delete).with(log);
+ let webroot = create.or(fetch).or(search).with(log);
debug!("starting warp");
warp::serve(webroot)
@@ -82,57 +83,22 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
-#[derive(Debug, Deserialize, Clone)]
-pub struct Config {
- pub chunks: PathBuf,
- pub address: String,
- pub tls_key: PathBuf,
- pub tls_cert: PathBuf,
-}
-
-#[derive(Debug, thiserror::Error)]
-enum ConfigError {
- #[error("Directory for chunks {0} does not exist")]
- ChunksDirNotFound(PathBuf),
-
- #[error("TLS certificate {0} does not exist")]
- TlsCertNotFound(PathBuf),
-
- #[error("TLS key {0} does not exist")]
- TlsKeyNotFound(PathBuf),
-
- #[error("server address can't be resolved")]
- BadServerAddress,
-}
-
-impl Config {
- pub fn read_config(filename: &Path) -> anyhow::Result<Config> {
- let config = std::fs::read_to_string(filename)?;
- let config: Config = serde_yaml::from_str(&config)?;
- config.check()?;
- Ok(config)
- }
-
- pub fn check(&self) -> anyhow::Result<()> {
- if !self.chunks.exists() {
- return Err(ConfigError::ChunksDirNotFound(self.chunks.clone()).into());
- }
- if !self.tls_cert.exists() {
- return Err(ConfigError::TlsCertNotFound(self.tls_cert.clone()).into());
- }
- if !self.tls_key.exists() {
- return Err(ConfigError::TlsKeyNotFound(self.tls_key.clone()).into());
- }
- Ok(())
- }
+fn load_config(filename: &Path) -> Result<ServerConfig, anyhow::Error> {
+ let config = ServerConfig::read_config(filename).with_context(|| {
+ format!(
+ "Couldn't read default configuration file {}",
+ filename.display()
+ )
+ })?;
+ Ok(config)
}
pub async fn create_chunk(
- store: Arc<Mutex<IndexedStore>>,
+ store: Arc<Mutex<ChunkStore>>,
meta: String,
data: Bytes,
) -> Result<impl warp::Reply, warp::Rejection> {
- let mut store = store.lock().await;
+ let store = store.lock().await;
let meta: ChunkMeta = match meta.parse() {
Ok(s) => s,
@@ -142,9 +108,7 @@ pub async fn create_chunk(
}
};
- let chunk = DataChunk::new(data.to_vec());
-
- let id = match store.save(&meta, &chunk) {
+ let id = match store.put(data.to_vec(), &meta).await {
Ok(id) => id,
Err(e) => {
error!("couldn't save: {}", e);
@@ -152,17 +116,17 @@ pub async fn create_chunk(
}
};
- info!("created chunk {}: {:?}", id, meta);
+ info!("created chunk {}", id);
Ok(ChunkResult::Created(id))
}
pub async fn fetch_chunk(
id: String,
- store: Arc<Mutex<IndexedStore>>,
+ store: Arc<Mutex<ChunkStore>>,
) -> Result<impl warp::Reply, warp::Rejection> {
let store = store.lock().await;
let id: ChunkId = id.parse().unwrap();
- match store.load(&id) {
+ match store.get(&id).await {
Ok((data, meta)) => {
info!("found chunk {}: {:?}", id, meta);
Ok(ChunkResult::Fetched(meta, data))
@@ -176,20 +140,23 @@ pub async fn fetch_chunk(
pub async fn search_chunks(
query: HashMap<String, String>,
- store: Arc<Mutex<IndexedStore>>,
+ store: Arc<Mutex<ChunkStore>>,
) -> Result<impl warp::Reply, warp::Rejection> {
let store = store.lock().await;
let mut query = query.iter();
let found = if let Some((key, value)) = query.next() {
- if query.next() != None {
+ if query.next().is_some() {
error!("search has more than one key to search for");
return Ok(ChunkResult::BadRequest);
}
- if key == "generation" && value == "true" {
- store.find_generations().expect("SQL lookup failed")
- } else if key == "sha256" {
- store.find_by_sha256(value).expect("SQL lookup failed")
+ if key == "label" {
+ let label = Label::deserialize(value).unwrap();
+ let label = ChunkMeta::new(&label);
+ store
+ .find_by_label(&label)
+ .await
+ .expect("SQL lookup failed")
} else {
error!("unknown search key {:?}", key);
return Ok(ChunkResult::BadRequest);
@@ -201,7 +168,7 @@ pub async fn search_chunks(
let mut hits = SearchHits::default();
for chunk_id in found {
- let meta = match store.load_meta(&chunk_id) {
+ let (_, meta) = match store.get(&chunk_id).await {
Ok(meta) => {
info!("search found chunk {}", chunk_id);
meta
@@ -240,30 +207,10 @@ impl SearchHits {
}
}
-pub async fn delete_chunk(
- id: String,
- store: Arc<Mutex<IndexedStore>>,
-) -> Result<impl warp::Reply, warp::Rejection> {
- let mut store = store.lock().await;
- let id: ChunkId = id.parse().unwrap();
-
- match store.remove(&id) {
- Ok(_) => {
- info!("chunk deleted: {}", id);
- Ok(ChunkResult::Deleted)
- }
- Err(e) => {
- error!("could not delete chunk {}: {:?}", id, e);
- Ok(ChunkResult::NotFound)
- }
- }
-}
-
enum ChunkResult {
Created(ChunkId),
- Fetched(ChunkMeta, DataChunk),
+ Fetched(ChunkMeta, Vec<u8>),
Found(SearchHits),
- Deleted,
NotFound,
BadRequest,
InternalServerError,
@@ -292,13 +239,12 @@ impl warp::Reply for ChunkResult {
);
into_response(
StatusCode::OK,
- chunk.data(),
+ &chunk,
"application/octet-stream",
Some(headers),
)
}
ChunkResult::Found(hits) => json_response(StatusCode::OK, hits.to_json(), None),
- ChunkResult::Deleted => status_response(StatusCode::OK),
ChunkResult::BadRequest => status_response(StatusCode::BAD_REQUEST),
ChunkResult::NotFound => status_response(StatusCode::NOT_FOUND),
ChunkResult::InternalServerError => status_response(StatusCode::INTERNAL_SERVER_ERROR),
diff --git a/src/bin/obnam.rs b/src/bin/obnam.rs
index e9f30ca..240960b 100644
--- a/src/bin/obnam.rs
+++ b/src/bin/obnam.rs
@@ -1,100 +1,126 @@
+use clap::Parser;
+use directories_next::ProjectDirs;
use log::{debug, error, info, LevelFilter};
use log4rs::append::file::FileAppender;
-use log4rs::config::{Appender, Config, Logger, Root};
-use obnam::client::ClientConfig;
-use obnam::cmd::{backup, get_chunk, list, list_files, restore, show_generation};
+use log4rs::config::{Appender, Logger, Root};
+use obnam::cmd::backup::Backup;
+use obnam::cmd::chunk::{DecryptChunk, EncryptChunk};
+use obnam::cmd::chunkify::Chunkify;
+use obnam::cmd::gen_info::GenInfo;
+use obnam::cmd::get_chunk::GetChunk;
+use obnam::cmd::init::Init;
+use obnam::cmd::inspect::Inspect;
+use obnam::cmd::list::List;
+use obnam::cmd::list_backup_versions::ListSchemaVersions;
+use obnam::cmd::list_files::ListFiles;
+use obnam::cmd::resolve::Resolve;
+use obnam::cmd::restore::Restore;
+use obnam::cmd::show_config::ShowConfig;
+use obnam::cmd::show_gen::ShowGeneration;
+use obnam::config::ClientConfig;
+use obnam::performance::{Clock, Performance};
use std::path::{Path, PathBuf};
-use structopt::StructOpt;
-const BUFFER_SIZE: usize = 1024 * 1024;
+const QUALIFIER: &str = "";
+const ORG: &str = "";
+const APPLICATION: &str = "obnam";
-fn main() -> anyhow::Result<()> {
- let opt = Opt::from_args();
- let config_file = match opt.config {
- None => default_config(),
- Some(ref path) => path.to_path_buf(),
- };
- let config = ClientConfig::read_config(&config_file)?;
- if let Some(ref log) = config.log {
- setup_logging(&log)?;
+fn main() {
+ let mut perf = Performance::default();
+ perf.start(Clock::RunTime);
+ if let Err(err) = main_program(&mut perf) {
+ error!("{}", err);
+ eprintln!("ERROR: {}", err);
+ std::process::exit(1);
}
+ perf.stop(Clock::RunTime);
+ perf.log();
+}
+
+fn main_program(perf: &mut Performance) -> anyhow::Result<()> {
+ let opt = Opt::parse();
+ let config = ClientConfig::read(&config_filename(&opt))?;
+ setup_logging(&config.log)?;
info!("client starts");
debug!("{:?}", opt);
+ debug!("configuration: {:#?}", config);
- let result = match opt.cmd {
- Command::Backup => backup(&config, BUFFER_SIZE),
- Command::List => list(&config),
- Command::ShowGeneration { gen_id } => show_generation(&config, &gen_id),
- Command::ListFiles { gen_id } => list_files(&config, &gen_id),
- Command::Restore { gen_id, to } => restore(&config, &gen_id, &to),
- Command::GetChunk { chunk_id } => get_chunk(&config, &chunk_id),
- };
-
- if let Err(ref e) = result {
- error!("{}", e);
- eprintln!("ERROR: {}", e);
- return result;
- }
+ match opt.cmd {
+ Command::Init(x) => x.run(&config),
+ Command::ListBackupVersions(x) => x.run(&config),
+ Command::Backup(x) => x.run(&config, perf),
+ Command::Inspect(x) => x.run(&config),
+ Command::Chunkify(x) => x.run(&config),
+ Command::List(x) => x.run(&config),
+ Command::ShowGeneration(x) => x.run(&config),
+ Command::ListFiles(x) => x.run(&config),
+ Command::Resolve(x) => x.run(&config),
+ Command::Restore(x) => x.run(&config),
+ Command::GenInfo(x) => x.run(&config),
+ Command::GetChunk(x) => x.run(&config),
+ Command::Config(x) => x.run(&config),
+ Command::EncryptChunk(x) => x.run(&config),
+ Command::DecryptChunk(x) => x.run(&config),
+ }?;
info!("client ends successfully");
Ok(())
}
+fn setup_logging(filename: &Path) -> anyhow::Result<()> {
+ let logfile = FileAppender::builder().build(filename)?;
+
+ let config = log4rs::Config::builder()
+ .appender(Appender::builder().build("obnam", Box::new(logfile)))
+ .logger(Logger::builder().build("obnam", LevelFilter::Debug))
+ .build(Root::builder().appender("obnam").build(LevelFilter::Debug))?;
+
+ log4rs::init_config(config)?;
+
+ Ok(())
+}
+
+fn config_filename(opt: &Opt) -> PathBuf {
+ match opt.config {
+ None => default_config(),
+ Some(ref filename) => filename.to_path_buf(),
+ }
+}
+
fn default_config() -> PathBuf {
- if let Some(path) = dirs::config_dir() {
- path.join("obnam").join("obnam.yaml")
- } else if let Some(path) = dirs::home_dir() {
- path.join(".config").join("obnam").join("obnam.yaml")
+ if let Some(dirs) = ProjectDirs::from(QUALIFIER, ORG, APPLICATION) {
+ dirs.config_dir().join("obnam.yaml")
} else {
- panic!("can't find config dir or home dir");
+ panic!("can't figure out the configuration directory");
}
}
-#[derive(Debug, StructOpt)]
-#[structopt(name = "obnam-backup", about = "Simplistic backup client")]
+#[derive(Debug, Parser)]
+#[clap(name = "obnam-backup", version, about = "Simplistic backup client")]
struct Opt {
- #[structopt(long, short, parse(from_os_str))]
+ #[clap(long, short)]
config: Option<PathBuf>,
- #[structopt(subcommand)]
+ #[clap(subcommand)]
cmd: Command,
}
-#[derive(Debug, StructOpt)]
+#[derive(Debug, Parser)]
enum Command {
- Backup,
- List,
- ListFiles {
- #[structopt(default_value = "latest")]
- gen_id: String,
- },
- Restore {
- #[structopt()]
- gen_id: String,
-
- #[structopt(parse(from_os_str))]
- to: PathBuf,
- },
- ShowGeneration {
- #[structopt(default_value = "latest")]
- gen_id: String,
- },
- GetChunk {
- #[structopt()]
- chunk_id: String,
- },
-}
-
-fn setup_logging(filename: &Path) -> anyhow::Result<()> {
- let logfile = FileAppender::builder().build(filename)?;
-
- let config = Config::builder()
- .appender(Appender::builder().build("obnam", Box::new(logfile)))
- .logger(Logger::builder().build("obnam", LevelFilter::Debug))
- .build(Root::builder().appender("obnam").build(LevelFilter::Debug))?;
-
- log4rs::init_config(config)?;
-
- Ok(())
+ Init(Init),
+ Backup(Backup),
+ Inspect(Inspect),
+ Chunkify(Chunkify),
+ List(List),
+ ListBackupVersions(ListSchemaVersions),
+ ListFiles(ListFiles),
+ Restore(Restore),
+ GenInfo(GenInfo),
+ ShowGeneration(ShowGeneration),
+ Resolve(Resolve),
+ GetChunk(GetChunk),
+ Config(ShowConfig),
+ EncryptChunk(EncryptChunk),
+ DecryptChunk(DecryptChunk),
}
diff --git a/src/checksummer.rs b/src/checksummer.rs
deleted file mode 100644
index 162c26b..0000000
--- a/src/checksummer.rs
+++ /dev/null
@@ -1,8 +0,0 @@
-use sha2::{Digest, Sha256};
-
-pub fn sha256(data: &[u8]) -> String {
- let mut hasher = Sha256::new();
- hasher.update(data);
- let hash = hasher.finalize();
- format!("{:x}", hash)
-}
diff --git a/src/chunk.rs b/src/chunk.rs
index 4917b60..a6abad3 100644
--- a/src/chunk.rs
+++ b/src/chunk.rs
@@ -1,60 +1,199 @@
+//! Chunks of data.
+
use crate::chunkid::ChunkId;
+use crate::chunkmeta::ChunkMeta;
+use crate::label::Label;
use serde::{Deserialize, Serialize};
use std::default::Default;
-/// Store an arbitrary chunk of data.
-///
-/// The data is just arbitrary binary data.
+/// An arbitrary chunk of arbitrary binary data.
///
/// A chunk also contains its associated metadata, except its
-/// identifier.
-#[derive(Debug, Clone, Serialize, Deserialize)]
+/// identifier, so that it's easy to keep the data and metadata
+/// together. The identifier is used to find the chunk, and it's
+/// assigned by the server when the chunk is uploaded, so it's not
+/// stored in the chunk itself.
+#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct DataChunk {
data: Vec<u8>,
+ meta: ChunkMeta,
}
impl DataChunk {
- /// Construct a new chunk.
- pub fn new(data: Vec<u8>) -> Self {
- Self { data }
+ /// Create a new chunk.
+ pub fn new(data: Vec<u8>, meta: ChunkMeta) -> Self {
+ Self { data, meta }
}
/// Return a chunk's data.
pub fn data(&self) -> &[u8] {
&self.data
}
+
+ /// Return a chunk's metadata.
+ pub fn meta(&self) -> &ChunkMeta {
+ &self.meta
+ }
}
+/// A chunk representing a backup generation.
+///
+/// A generation chunk lists all the data chunks for the SQLite file
+/// with the backup's metadata. It's different from a normal data
+/// chunk so that we can do things that make no sense to a data chunk.
+/// Generation chunks can be converted into or created from data
+/// chunks, for uploading to or downloading from the server.
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct GenerationChunk {
chunk_ids: Vec<ChunkId>,
}
+/// All the errors that may be returned for `GenerationChunk` operations.
+#[derive(Debug, thiserror::Error)]
+pub enum GenerationChunkError {
+ /// Error converting text from UTF8.
+ #[error(transparent)]
+ Utf8Error(#[from] std::str::Utf8Error),
+
+ /// Error parsing JSON as chunk metadata.
+ #[error("failed to parse JSON: {0}")]
+ JsonParse(serde_json::Error),
+
+ /// Error generating JSON from chunk metadata.
+ #[error("failed to serialize to JSON: {0}")]
+ JsonGenerate(serde_json::Error),
+}
+
impl GenerationChunk {
+ /// Create a new backup generation chunk from metadata chunk ids.
pub fn new(chunk_ids: Vec<ChunkId>) -> Self {
Self { chunk_ids }
}
- pub fn from_data_chunk(chunk: &DataChunk) -> anyhow::Result<Self> {
+ /// Create a new backup generation chunk from a data chunk.
+ pub fn from_data_chunk(chunk: &DataChunk) -> Result<Self, GenerationChunkError> {
let data = chunk.data();
let data = std::str::from_utf8(data)?;
- Ok(serde_json::from_str(data)?)
+ serde_json::from_str(data).map_err(GenerationChunkError::JsonParse)
}
+ /// Does the generation chunk contain any metadata chunks?
pub fn is_empty(&self) -> bool {
self.chunk_ids.is_empty()
}
+ /// How many metadata chunks does generation chunk contain?
pub fn len(&self) -> usize {
self.chunk_ids.len()
}
+ /// Return iterator over the metadata chunk identifiers.
pub fn chunk_ids(&self) -> impl Iterator<Item = &ChunkId> {
self.chunk_ids.iter()
}
- pub fn to_data_chunk(&self) -> anyhow::Result<DataChunk> {
- let json = serde_json::to_string(self)?;
- Ok(DataChunk::new(json.as_bytes().to_vec()))
+ /// Convert generation chunk to a data chunk.
+ pub fn to_data_chunk(&self) -> Result<DataChunk, GenerationChunkError> {
+ let json: String =
+ serde_json::to_string(self).map_err(GenerationChunkError::JsonGenerate)?;
+ let bytes = json.as_bytes().to_vec();
+ let checksum = Label::sha256(&bytes);
+ let meta = ChunkMeta::new(&checksum);
+ Ok(DataChunk::new(bytes, meta))
+ }
+}
+
+/// A client trust root chunk.
+///
+/// This chunk contains all per-client backup information. As long as
+/// this chunk can be trusted, everything it links to can also be
+/// trusted, thanks to cryptographic signatures.
+#[derive(Debug, Serialize, Deserialize)]
+pub struct ClientTrust {
+ client_name: String,
+ previous_version: Option<ChunkId>,
+ timestamp: String,
+ backups: Vec<ChunkId>,
+}
+
+/// All the errors that may be returned for `ClientTrust` operations.
+#[derive(Debug, thiserror::Error)]
+pub enum ClientTrustError {
+ /// Error converting text from UTF8.
+ #[error(transparent)]
+ Utf8Error(#[from] std::str::Utf8Error),
+
+ /// Error parsing JSON as chunk metadata.
+ #[error("failed to parse JSON: {0}")]
+ JsonParse(serde_json::Error),
+
+ /// Error generating JSON from chunk metadata.
+ #[error("failed to serialize to JSON: {0}")]
+ JsonGenerate(serde_json::Error),
+}
+
+impl ClientTrust {
+ /// Create a new ClientTrust object.
+ pub fn new(
+ name: &str,
+ previous_version: Option<ChunkId>,
+ timestamp: String,
+ backups: Vec<ChunkId>,
+ ) -> Self {
+ Self {
+ client_name: name.to_string(),
+ previous_version,
+ timestamp,
+ backups,
+ }
+ }
+
+ /// Return client name.
+ pub fn client_name(&self) -> &str {
+ &self.client_name
+ }
+
+ /// Return id of previous version, if any.
+ pub fn previous_version(&self) -> Option<ChunkId> {
+ self.previous_version.clone()
+ }
+
+ /// Return timestamp.
+ pub fn timestamp(&self) -> &str {
+ &self.timestamp
+ }
+
+ /// Return list of all backup generations known.
+ pub fn backups(&self) -> &[ChunkId] {
+ &self.backups
+ }
+
+ /// Append a backup generation to the list.
+ pub fn append_backup(&mut self, id: &ChunkId) {
+ self.backups.push(id.clone());
+ }
+
+ /// Update for new upload.
+ ///
+ /// This needs to happen every time the chunk is updated so that
+ /// the timestamp gets updated.
+ pub fn finalize(&mut self, timestamp: String) {
+ self.timestamp = timestamp;
+ }
+
+ /// Convert generation chunk to a data chunk.
+ pub fn to_data_chunk(&self) -> Result<DataChunk, ClientTrustError> {
+ let json: String = serde_json::to_string(self).map_err(ClientTrustError::JsonGenerate)?;
+ let bytes = json.as_bytes().to_vec();
+ let checksum = Label::literal("client-trust");
+ let meta = ChunkMeta::new(&checksum);
+ Ok(DataChunk::new(bytes, meta))
+ }
+
+ /// Create a new ClientTrust from a data chunk.
+ pub fn from_data_chunk(chunk: &DataChunk) -> Result<Self, ClientTrustError> {
+ let data = chunk.data();
+ let data = std::str::from_utf8(data)?;
+ serde_json::from_str(data).map_err(ClientTrustError::JsonParse)
}
}
diff --git a/src/chunker.rs b/src/chunker.rs
index 145b1db..9883f89 100644
--- a/src/chunker.rs
+++ b/src/chunker.rs
@@ -1,30 +1,54 @@
-use crate::checksummer::sha256;
+//! Split file data into chunks.
+
use crate::chunk::DataChunk;
use crate::chunkmeta::ChunkMeta;
+use crate::label::{Label, LabelChecksumKind};
use std::io::prelude::*;
+use std::path::{Path, PathBuf};
-pub struct Chunker {
+/// Iterator over chunks in a file.
+pub struct FileChunks {
chunk_size: usize,
+ kind: LabelChecksumKind,
buf: Vec<u8>,
+ filename: PathBuf,
handle: std::fs::File,
}
-impl Chunker {
- pub fn new(chunk_size: usize, handle: std::fs::File) -> Self {
- let mut buf = vec![];
- buf.resize(chunk_size, 0);
+/// Possible errors from data chunking.
+#[derive(Debug, thiserror::Error)]
+pub enum ChunkerError {
+ /// Error reading from a file.
+ #[error("failed to read file {0}: {1}")]
+ FileRead(PathBuf, std::io::Error),
+}
+
+impl FileChunks {
+ /// Create new iterator.
+ pub fn new(
+ chunk_size: usize,
+ handle: std::fs::File,
+ filename: &Path,
+ kind: LabelChecksumKind,
+ ) -> Self {
+ let buf = vec![0; chunk_size];
Self {
chunk_size,
+ kind,
buf,
handle,
+ filename: filename.to_path_buf(),
}
}
- pub fn read_chunk(&mut self) -> anyhow::Result<Option<(ChunkMeta, DataChunk)>> {
+ fn read_chunk(&mut self) -> Result<Option<DataChunk>, ChunkerError> {
let mut used = 0;
loop {
- let n = self.handle.read(&mut self.buf.as_mut_slice()[used..])?;
+ let n = self
+ .handle
+ .read(&mut self.buf.as_mut_slice()[used..])
+ .map_err(|err| ChunkerError::FileRead(self.filename.to_path_buf(), err))?;
used += n;
if n == 0 || used == self.chunk_size {
break;
@@ -36,20 +60,24 @@ impl Chunker {
}
let buffer = &self.buf.as_slice()[..used];
- let hash = sha256(buffer);
+ let hash = match self.kind {
+ LabelChecksumKind::Blake2 => Label::blake2(buffer),
+ LabelChecksumKind::Sha256 => Label::sha256(buffer),
+ };
let meta = ChunkMeta::new(&hash);
- let chunk = DataChunk::new(buffer.to_vec());
- Ok(Some((meta, chunk)))
+ let chunk = DataChunk::new(buffer.to_vec(), meta);
+ Ok(Some(chunk))
}
}
-impl Iterator for Chunker {
- type Item = anyhow::Result<(ChunkMeta, DataChunk)>;
+impl Iterator for FileChunks {
+ type Item = Result<DataChunk, ChunkerError>;
- fn next(&mut self) -> Option<anyhow::Result<(ChunkMeta, DataChunk)>> {
+ /// Return the next chunk, if any, or an error.
+ fn next(&mut self) -> Option<Result<DataChunk, ChunkerError>> {
match self.read_chunk() {
Ok(None) => None,
- Ok(Some((meta, chunk))) => Some(Ok((meta, chunk))),
+ Ok(Some(chunk)) => Some(Ok(chunk)),
Err(e) => Some(Err(e)),
}
}
diff --git a/src/chunkid.rs b/src/chunkid.rs
index 3933d4b..50fc3d3 100644
--- a/src/chunkid.rs
+++ b/src/chunkid.rs
@@ -1,4 +1,9 @@
-use crate::checksummer::sha256;
+//! The identifier for a chunk.
+//!
+//! Chunk identifiers are chosen by the server. Each chunk has a
+//! unique identifier, which isn't based on the contents of the chunk.
+
+use crate::label::Label;
use rusqlite::types::ToSqlOutput;
use rusqlite::ToSql;
use serde::{Deserialize, Serialize};
@@ -37,20 +42,24 @@ impl ChunkId {
}
}
- pub fn from_str(s: &str) -> Self {
+ /// Re-construct an identifier from a previous value.
+ pub fn recreate(s: &str) -> Self {
ChunkId { id: s.to_string() }
}
+ /// Return the identifier as a slice of bytes.
pub fn as_bytes(&self) -> &[u8] {
self.id.as_bytes()
}
- pub fn sha256(&self) -> String {
- sha256(self.id.as_bytes())
+ /// Return the SHA256 checksum of the identifier.
+ pub fn sha256(&self) -> Label {
+ Label::sha256(self.id.as_bytes())
}
}
impl ToSql for ChunkId {
+ /// Format identifier for SQL.
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput> {
Ok(ToSqlOutput::Owned(rusqlite::types::Value::Text(
self.id.clone(),
@@ -68,12 +77,14 @@ impl fmt::Display for ChunkId {
}
impl From<&String> for ChunkId {
+ /// Create a chunk identifier from a string.
fn from(s: &String) -> Self {
ChunkId { id: s.to_string() }
}
}
impl From<&OsStr> for ChunkId {
+ /// Create a chunk identifier from an operating system string.
fn from(s: &OsStr) -> Self {
ChunkId {
id: s.to_string_lossy().to_string(),
@@ -84,8 +95,9 @@ impl From<&OsStr> for ChunkId {
impl FromStr for ChunkId {
type Err = ();
+ /// Create a chunk from a string.
fn from_str(s: &str) -> Result<Self, Self::Err> {
- Ok(ChunkId::from_str(s))
+ Ok(ChunkId::recreate(s))
}
}
@@ -117,6 +129,6 @@ mod test {
fn survives_round_trip() {
let id = ChunkId::new();
let id_str = id.to_string();
- assert_eq!(id, ChunkId::from_str(&id_str))
+ assert_eq!(id, ChunkId::recreate(&id_str))
}
}
diff --git a/src/chunkmeta.rs b/src/chunkmeta.rs
index 37e2ed5..e2fa9b3 100644
--- a/src/chunkmeta.rs
+++ b/src/chunkmeta.rs
@@ -1,28 +1,21 @@
+//! Metadata about a chunk.
+
+use crate::label::Label;
use serde::{Deserialize, Serialize};
use std::default::Default;
use std::str::FromStr;
/// Metadata about chunks.
///
-/// We manage three bits of metadata about chunks, in addition to its
-/// identifier:
-///
-/// * for all chunks, a [SHA256][] checksum of the chunk content
-///
-/// * for generation chunks, an indication that it is a generation
-/// chunk, and a timestamp for when making the generation snapshot
-/// ended
-///
-/// There is no syntax or semantics imposed on the timestamp, but a
-/// client should probably use [ISO 8601][] representation.
+/// We a single piece of metadata about chunks, in addition to its
+/// identifier: a label assigned by the client. Currently, this is a
+/// [SHA256][] checksum of the chunk content.
///
/// For HTTP, the metadata will be serialised as a JSON object, like this:
///
/// ~~~json
/// {
-/// "sha256": "09ca7e4eaa6e8ae9c7d261167129184883644d07dfba7cbfbc4c8a2e08360d5b",
-/// "generation": true,
-/// "ended": "2020-09-17T08:17:13+03:00"
+/// "label": "09ca7e4eaa6e8ae9c7d261167129184883644d07dfba7cbfbc4c8a2e08360d5b",
/// }
/// ~~~
///
@@ -35,55 +28,44 @@ use std::str::FromStr;
///
/// [ISO 8601]: https://en.wikipedia.org/wiki/ISO_8601
/// [SHA256]: https://en.wikipedia.org/wiki/SHA-2
-#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
+#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct ChunkMeta {
- sha256: String,
- // The remaining fields are Options so that JSON parsing doesn't
- // insist on them being there in the textual representation.
- generation: Option<bool>,
- ended: Option<String>,
+ label: String,
}
impl ChunkMeta {
/// Create a new data chunk.
///
/// Data chunks are not for generations.
- pub fn new(sha256: &str) -> Self {
+ pub fn new(label: &Label) -> Self {
ChunkMeta {
- sha256: sha256.to_string(),
- generation: None,
- ended: None,
+ label: label.serialize(),
}
}
- /// Create a new generation chunk.
- pub fn new_generation(sha256: &str, ended: &str) -> Self {
- ChunkMeta {
- sha256: sha256.to_string(),
- generation: Some(true),
- ended: Some(ended.to_string()),
- }
- }
-
- /// Is this a generation chunk?
- pub fn is_generation(&self) -> bool {
- matches!(self.generation, Some(true))
- }
-
- /// When did this generation end?
- pub fn ended(&self) -> Option<&str> {
- self.ended.as_deref()
+ /// The label of the content of the chunk.
+ ///
+ /// The caller should not interpret the label in any way. It
+ /// happens to be a SHA256 of the cleartext contents of the
+ /// checksum for now, but that _will_ change in the future.
+ pub fn label(&self) -> &str {
+ &self.label
}
- /// SHA256 checksum of the content of the chunk.
- pub fn sha256(&self) -> &str {
- &self.sha256
+ /// Serialize from a textual JSON representation.
+ pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
+ serde_json::from_str(json)
}
/// Serialize as JSON.
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap()
}
+
+ /// Serialize as JSON, as a byte vector.
+ pub fn to_json_vec(&self) -> Vec<u8> {
+ self.to_json().as_bytes().to_vec()
+ }
}
impl FromStr for ChunkMeta {
@@ -97,48 +79,54 @@ impl FromStr for ChunkMeta {
#[cfg(test)]
mod test {
- use super::ChunkMeta;
+ use super::{ChunkMeta, Label};
#[test]
fn new_creates_data_chunk() {
- let meta = ChunkMeta::new("abcdef");
- assert!(!meta.is_generation());
- assert_eq!(meta.ended(), None);
- assert_eq!(meta.sha256(), "abcdef");
+ let sum = Label::sha256(b"abcdef");
+ let meta = ChunkMeta::new(&sum);
+ assert_eq!(meta.label(), sum.serialize());
}
#[test]
fn new_generation_creates_generation_chunk() {
- let meta = ChunkMeta::new_generation("abcdef", "2020-09-17T08:17:13+03:00");
- assert!(meta.is_generation());
- assert_eq!(meta.ended(), Some("2020-09-17T08:17:13+03:00"));
- assert_eq!(meta.sha256(), "abcdef");
+ let sum = Label::sha256(b"abcdef");
+ let meta = ChunkMeta::new(&sum);
+ assert_eq!(meta.label(), sum.serialize());
}
#[test]
fn data_chunk_from_json() {
- let meta: ChunkMeta = r#"{"sha256": "abcdef"}"#.parse().unwrap();
- assert!(!meta.is_generation());
- assert_eq!(meta.ended(), None);
- assert_eq!(meta.sha256(), "abcdef");
+ let meta: ChunkMeta = r#"{"label": "abcdef"}"#.parse().unwrap();
+ assert_eq!(meta.label(), "abcdef");
}
#[test]
fn generation_chunk_from_json() {
let meta: ChunkMeta =
- r#"{"sha256": "abcdef", "generation": true, "ended": "2020-09-17T08:17:13+03:00"}"#
+ r#"{"label": "abcdef", "generation": true, "ended": "2020-09-17T08:17:13+03:00"}"#
.parse()
.unwrap();
- assert!(meta.is_generation());
- assert_eq!(meta.ended(), Some("2020-09-17T08:17:13+03:00"));
- assert_eq!(meta.sha256(), "abcdef");
+
+ assert_eq!(meta.label(), "abcdef");
}
#[test]
- fn json_roundtrip() {
- let meta = ChunkMeta::new_generation("abcdef", "2020-09-17T08:17:13+03:00");
+ fn generation_json_roundtrip() {
+ let sum = Label::sha256(b"abcdef");
+ let meta = ChunkMeta::new(&sum);
let json = serde_json::to_string(&meta).unwrap();
let meta2 = serde_json::from_str(&json).unwrap();
assert_eq!(meta, meta2);
}
+
+ #[test]
+ fn data_json_roundtrip() {
+ let sum = Label::sha256(b"abcdef");
+ let meta = ChunkMeta::new(&sum);
+ let json = meta.to_json_vec();
+ let meta2 = serde_json::from_slice(&json).unwrap();
+ assert_eq!(meta, meta2);
+ assert_eq!(meta.to_json_vec(), meta2.to_json_vec());
+ }
}
diff --git a/src/chunkstore.rs b/src/chunkstore.rs
new file mode 100644
index 0000000..4c8125c
--- /dev/null
+++ b/src/chunkstore.rs
@@ -0,0 +1,307 @@
+//! Access local and remote chunk stores.
+//!
+//! A chunk store may be local and accessed via the file system, or
+//! remote and accessed over HTTP. This module implements both. This
+//! module only handles encrypted chunks.
+
+use crate::chunkid::ChunkId;
+use crate::chunkmeta::ChunkMeta;
+use crate::config::{ClientConfig, ClientConfigError};
+use crate::index::{Index, IndexError};
+
+use log::{debug, error, info};
+use reqwest::header::HeaderMap;
+use std::collections::HashMap;
+use std::path::{Path, PathBuf};
+use tokio::sync::Mutex;
+
+/// A chunk store.
+///
+/// The store may be local or remote.
+pub enum ChunkStore {
+ /// A local chunk store.
+ Local(LocalStore),
+
+ /// A remote chunk store.
+ Remote(RemoteStore),
+}
+
+impl ChunkStore {
+ /// Open a local chunk store.
+ pub fn local<P: AsRef<Path>>(path: P) -> Result<Self, StoreError> {
+ let store = LocalStore::new(path.as_ref())?;
+ Ok(Self::Local(store))
+ }
+
+ /// Open a remote chunk store.
+ pub fn remote(config: &ClientConfig) -> Result<Self, StoreError> {
+ let store = RemoteStore::new(config)?;
+ Ok(Self::Remote(store))
+ }
+
+ /// Does the store have a chunk with a given label?
+ pub async fn find_by_label(&self, meta: &ChunkMeta) -> Result<Vec<ChunkId>, StoreError> {
+ match self {
+ Self::Local(store) => store.find_by_label(meta).await,
+ Self::Remote(store) => store.find_by_label(meta).await,
+ }
+ }
+
+ /// Store a chunk in the store.
+ ///
+ /// The store chooses an id for the chunk.
+ pub async fn put(&self, chunk: Vec<u8>, meta: &ChunkMeta) -> Result<ChunkId, StoreError> {
+ match self {
+ Self::Local(store) => store.put(chunk, meta).await,
+ Self::Remote(store) => store.put(chunk, meta).await,
+ }
+ }
+
+ /// Get a chunk given its id.
+ pub async fn get(&self, id: &ChunkId) -> Result<(Vec<u8>, ChunkMeta), StoreError> {
+ match self {
+ Self::Local(store) => store.get(id).await,
+ Self::Remote(store) => store.get(id).await,
+ }
+ }
+}
+
+/// A local chunk store.
+pub struct LocalStore {
+ path: PathBuf,
+ index: Mutex<Index>,
+}
+
+impl LocalStore {
+ fn new(path: &Path) -> Result<Self, StoreError> {
+ Ok(Self {
+ path: path.to_path_buf(),
+ index: Mutex::new(Index::new(path)?),
+ })
+ }
+
+ async fn find_by_label(&self, meta: &ChunkMeta) -> Result<Vec<ChunkId>, StoreError> {
+ self.index
+ .lock()
+ .await
+ .find_by_label(meta.label())
+ .map_err(StoreError::Index)
+ }
+
+ async fn put(&self, chunk: Vec<u8>, meta: &ChunkMeta) -> Result<ChunkId, StoreError> {
+ let id = ChunkId::new();
+ let (dir, filename) = self.filename(&id);
+
+ if !dir.exists() {
+ std::fs::create_dir_all(&dir).map_err(|err| StoreError::ChunkMkdir(dir, err))?;
+ }
+
+ std::fs::write(&filename, &chunk)
+ .map_err(|err| StoreError::WriteChunk(filename.clone(), err))?;
+ self.index
+ .lock()
+ .await
+ .insert_meta(id.clone(), meta.clone())
+ .map_err(StoreError::Index)?;
+ Ok(id)
+ }
+
+ async fn get(&self, id: &ChunkId) -> Result<(Vec<u8>, ChunkMeta), StoreError> {
+ let meta = self.index.lock().await.get_meta(id)?;
+
+ let (_, filename) = &self.filename(id);
+
+ let raw =
+ std::fs::read(filename).map_err(|err| StoreError::ReadChunk(filename.clone(), err))?;
+
+ Ok((raw, meta))
+ }
+
+ fn filename(&self, id: &ChunkId) -> (PathBuf, PathBuf) {
+ let bytes = id.as_bytes();
+ assert!(bytes.len() > 3);
+ let a = bytes[0];
+ let b = bytes[1];
+ let c = bytes[2];
+ let dir = self.path.join(format!("{}/{}/{}", a, b, c));
+ let filename = dir.join(format!("{}.data", id));
+ (dir, filename)
+ }
+}
+
+/// A remote chunk store.
+pub struct RemoteStore {
+ client: reqwest::Client,
+ base_url: String,
+}
+
+impl RemoteStore {
+ fn new(config: &ClientConfig) -> Result<Self, StoreError> {
+ info!("creating remote store with config: {:#?}", config);
+
+ let client = reqwest::Client::builder()
+ .danger_accept_invalid_certs(!config.verify_tls_cert)
+ .build()
+ .map_err(StoreError::ReqwestError)?;
+ Ok(Self {
+ client,
+ base_url: config.server_url.to_string(),
+ })
+ }
+
+ async fn find_by_label(&self, meta: &ChunkMeta) -> Result<Vec<ChunkId>, StoreError> {
+ let body = match self.get_helper("", &[("label", meta.label())]).await {
+ Ok((_, body)) => body,
+ Err(err) => return Err(err),
+ };
+
+ let hits: HashMap<String, ChunkMeta> =
+ serde_json::from_slice(&body).map_err(StoreError::JsonParse)?;
+ let ids = hits.keys().map(|id| ChunkId::recreate(id)).collect();
+ Ok(ids)
+ }
+
+ async fn put(&self, chunk: Vec<u8>, meta: &ChunkMeta) -> Result<ChunkId, StoreError> {
+ let res = self
+ .client
+ .post(&self.chunks_url())
+ .header("chunk-meta", meta.to_json())
+ .body(chunk)
+ .send()
+ .await
+ .map_err(StoreError::ReqwestError)?;
+ let res: HashMap<String, String> = res.json().await.map_err(StoreError::ReqwestError)?;
+ debug!("upload_chunk: res={:?}", res);
+ let chunk_id = if let Some(chunk_id) = res.get("chunk_id") {
+ debug!("upload_chunk: id={}", chunk_id);
+ chunk_id.parse().unwrap()
+ } else {
+ return Err(StoreError::NoCreatedChunkId);
+ };
+ info!("uploaded_chunk {}", chunk_id);
+ Ok(chunk_id)
+ }
+
+ async fn get(&self, id: &ChunkId) -> Result<(Vec<u8>, ChunkMeta), StoreError> {
+ let (headers, body) = self.get_helper(&format!("/{}", id), &[]).await?;
+ let meta = self.get_chunk_meta_header(id, &headers)?;
+ Ok((body, meta))
+ }
+
+ fn base_url(&self) -> &str {
+ &self.base_url
+ }
+
+ fn chunks_url(&self) -> String {
+ format!("{}/v1/chunks", self.base_url())
+ }
+
+ async fn get_helper(
+ &self,
+ path: &str,
+ query: &[(&str, &str)],
+ ) -> Result<(HeaderMap, Vec<u8>), StoreError> {
+ let url = format!("{}{}", &self.chunks_url(), path);
+ info!("GET {}", url);
+
+ // Build HTTP request structure.
+ let req = self
+ .client
+ .get(&url)
+ .query(query)
+ .build()
+ .map_err(StoreError::ReqwestError)?;
+
+ // Make HTTP request.
+ let res = self
+ .client
+ .execute(req)
+ .await
+ .map_err(StoreError::ReqwestError)?;
+
+ // Did it work?
+ if res.status() != 200 {
+ return Err(StoreError::NotFound(path.to_string()));
+ }
+
+ // Return headers and body.
+ let headers = res.headers().clone();
+ let body = res.bytes().await.map_err(StoreError::ReqwestError)?;
+ let body = body.to_vec();
+ Ok((headers, body))
+ }
+
+ fn get_chunk_meta_header(
+ &self,
+ chunk_id: &ChunkId,
+ headers: &HeaderMap,
+ ) -> Result<ChunkMeta, StoreError> {
+ let meta = headers.get("chunk-meta");
+
+ if meta.is_none() {
+ let err = StoreError::NoChunkMeta(chunk_id.clone());
+ error!("fetching chunk {} failed: {}", chunk_id, err);
+ return Err(err);
+ }
+
+ let meta = meta
+ .unwrap()
+ .to_str()
+ .map_err(StoreError::MetaHeaderToString)?;
+ let meta: ChunkMeta = serde_json::from_str(meta).map_err(StoreError::JsonParse)?;
+
+ Ok(meta)
+ }
+}
+
+/// Possible errors from using a ChunkStore.
+#[derive(Debug, thiserror::Error)]
+pub enum StoreError {
+ /// FIXME
+ #[error("FIXME")]
+ FIXME,
+
+ /// Error from a chunk index.
+ #[error(transparent)]
+ Index(#[from] IndexError),
+
+ /// An error from the HTTP library.
+ #[error("error from reqwest library: {0}")]
+ ReqwestError(reqwest::Error),
+
+ /// Client configuration is wrong.
+ #[error(transparent)]
+ ClientConfigError(#[from] ClientConfigError),
+
+ /// Server claims to not have an entity.
+ #[error("Server does not have {0}")]
+ NotFound(String),
+
+ /// Server didn't give us a chunk's metadata.
+ #[error("Server response did not have a 'chunk-meta' header for chunk {0}")]
+ NoChunkMeta(ChunkId),
+
+ /// An error with the `chunk-meta` header.
+ #[error("couldn't convert response chunk-meta header to string: {0}")]
+ MetaHeaderToString(reqwest::header::ToStrError),
+
+ /// Error parsing JSON.
+ #[error("failed to parse JSON: {0}")]
+ JsonParse(serde_json::Error),
+
+ /// An error creating chunk directory.
+ #[error("Failed to create chunk directory {0}")]
+ ChunkMkdir(PathBuf, #[source] std::io::Error),
+
+ /// An error writing a chunk file.
+ #[error("Failed to write chunk {0}")]
+ WriteChunk(PathBuf, #[source] std::io::Error),
+
+ /// An error reading a chunk file.
+ #[error("Failed to read chunk {0}")]
+ ReadChunk(PathBuf, #[source] std::io::Error),
+
+ /// No chunk id for uploaded chunk.
+ #[error("Server response claimed it had created a chunk, but lacked chunk id")]
+ NoCreatedChunkId,
+}
diff --git a/src/cipher.rs b/src/cipher.rs
new file mode 100644
index 0000000..21785b9
--- /dev/null
+++ b/src/cipher.rs
@@ -0,0 +1,249 @@
+//! Encryption cipher algorithms.
+
+use crate::chunk::DataChunk;
+use crate::chunkmeta::ChunkMeta;
+use crate::passwords::Passwords;
+
+use aes_gcm::aead::{generic_array::GenericArray, Aead, KeyInit, Payload};
+use aes_gcm::Aes256Gcm; // Or `Aes128Gcm`
+use rand::Rng;
+
+use std::str::FromStr;
+
+const CHUNK_V1: &[u8] = b"0001";
+
+/// An encrypted chunk.
+///
+/// This consists of encrypted ciphertext, and un-encrypted (or
+/// cleartext) additional associated data, which could be the metadata
+/// of the chunk, and be used to, for example, find chunks.
+///
+/// Encrypted chunks are the only chunks that can be uploaded to the
+/// server.
+pub struct EncryptedChunk {
+ ciphertext: Vec<u8>,
+ aad: Vec<u8>,
+}
+
+impl EncryptedChunk {
+ /// Create an encrypted chunk.
+ fn new(ciphertext: Vec<u8>, aad: Vec<u8>) -> Self {
+ Self { ciphertext, aad }
+ }
+
+ /// Return the encrypted data.
+ pub fn ciphertext(&self) -> &[u8] {
+ &self.ciphertext
+ }
+
+ /// Return the cleartext associated additional data.
+ pub fn aad(&self) -> &[u8] {
+ &self.aad
+ }
+}
+
+/// An engine for encrypting and decrypting chunks.
+pub struct CipherEngine {
+ cipher: Aes256Gcm,
+}
+
+impl CipherEngine {
+ /// Create a new cipher engine using cleartext passwords.
+ pub fn new(pass: &Passwords) -> Self {
+ let key = GenericArray::from_slice(pass.encryption_key());
+ Self {
+ cipher: Aes256Gcm::new(key),
+ }
+ }
+
+ /// Encrypt a chunk.
+ pub fn encrypt_chunk(&self, chunk: &DataChunk) -> Result<EncryptedChunk, CipherError> {
+ // Payload with metadata as associated data, to be encrypted.
+ //
+ // The metadata will be stored in cleartext after encryption.
+ let aad = chunk.meta().to_json_vec();
+ let payload = Payload {
+ msg: chunk.data(),
+ aad: &aad,
+ };
+
+ // Unique random key for each encryption.
+ let nonce = Nonce::new();
+ let nonce_arr = GenericArray::from_slice(nonce.as_bytes());
+
+ // Encrypt the sensitive part.
+ let ciphertext = self
+ .cipher
+ .encrypt(nonce_arr, payload)
+ .map_err(CipherError::EncryptError)?;
+
+ // Construct the blob to be stored on the server.
+ let mut vec: Vec<u8> = vec![];
+ push_bytes(&mut vec, CHUNK_V1);
+ push_bytes(&mut vec, nonce.as_bytes());
+ push_bytes(&mut vec, &ciphertext);
+
+ Ok(EncryptedChunk::new(vec, aad))
+ }
+
+ /// Decrypt a chunk.
+ pub fn decrypt_chunk(&self, bytes: &[u8], meta: &[u8]) -> Result<DataChunk, CipherError> {
+ // Does encrypted chunk start with the right version?
+ if !bytes.starts_with(CHUNK_V1) {
+ return Err(CipherError::UnknownChunkVersion);
+ }
+ let version_len = CHUNK_V1.len();
+ let bytes = &bytes[version_len..];
+
+ let (nonce, ciphertext) = match bytes.get(..NONCE_SIZE) {
+ Some(nonce) => (GenericArray::from_slice(nonce), &bytes[NONCE_SIZE..]),
+ None => return Err(CipherError::NoNonce),
+ };
+
+ let payload = Payload {
+ msg: ciphertext,
+ aad: meta,
+ };
+
+ let payload = self
+ .cipher
+ .decrypt(nonce, payload)
+ .map_err(CipherError::DecryptError)?;
+ let payload = Payload::from(payload.as_slice());
+
+ let meta = std::str::from_utf8(meta)?;
+ let meta = ChunkMeta::from_str(meta)?;
+
+ let chunk = DataChunk::new(payload.msg.to_vec(), meta);
+
+ Ok(chunk)
+ }
+}
+
+fn push_bytes(vec: &mut Vec<u8>, bytes: &[u8]) {
+ for byte in bytes.iter() {
+ vec.push(*byte);
+ }
+}
+
+/// Possible errors when encrypting or decrypting chunks.
+#[derive(Debug, thiserror::Error)]
+pub enum CipherError {
+ /// Encryption failed.
+ #[error("failed to encrypt with AES-GEM: {0}")]
+ EncryptError(aes_gcm::Error),
+
+ /// The encrypted chunk has an unsupported version or is
+ /// corrupted.
+ #[error("encrypted chunk does not start with correct version")]
+ UnknownChunkVersion,
+
+ /// The encrypted chunk lacks a complete nonce value, and is
+ /// probably corrupted.
+ #[error("encrypted chunk does not have a complete nonce")]
+ NoNonce,
+
+ /// Decryption failed.
+ #[error("failed to decrypt with AES-GEM: {0}")]
+ DecryptError(aes_gcm::Error),
+
+ /// The decryption succeeded, by data isn't valid YAML.
+ #[error("failed to parse decrypted data as a DataChunk: {0}")]
+ Parse(serde_yaml::Error),
+
+ /// Error parsing UTF8 data.
+ #[error(transparent)]
+ Utf8Error(#[from] std::str::Utf8Error),
+
+ /// Error parsing JSON data.
+ #[error("failed to parse JSON: {0}")]
+ JsonParse(#[from] serde_json::Error),
+}
+
+const NONCE_SIZE: usize = 12;
+
+#[derive(Debug)]
+struct Nonce {
+ nonce: Vec<u8>,
+}
+
+impl Nonce {
+ fn from_bytes(bytes: &[u8]) -> Self {
+ assert_eq!(bytes.len(), NONCE_SIZE);
+ Self {
+ nonce: bytes.to_vec(),
+ }
+ }
+
+ fn new() -> Self {
+ let mut bytes: Vec<u8> = vec![0; NONCE_SIZE];
+ let mut rng = rand::thread_rng();
+ for x in bytes.iter_mut() {
+ *x = rng.gen();
+ }
+ Self::from_bytes(&bytes)
+ }
+
+ fn as_bytes(&self) -> &[u8] {
+ &self.nonce
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use crate::chunk::DataChunk;
+ use crate::chunkmeta::ChunkMeta;
+ use crate::cipher::{CipherEngine, CipherError, CHUNK_V1, NONCE_SIZE};
+ use crate::label::Label;
+ use crate::passwords::Passwords;
+
+ #[test]
+ fn metadata_as_aad() {
+ let sum = Label::sha256(b"dummy data");
+ let meta = ChunkMeta::new(&sum);
+ let meta_as_aad = meta.to_json_vec();
+ let chunk = DataChunk::new("hello".as_bytes().to_vec(), meta);
+ let pass = Passwords::new("secret");
+ let cipher = CipherEngine::new(&pass);
+ let enc = cipher.encrypt_chunk(&chunk).unwrap();
+
+ assert_eq!(meta_as_aad, enc.aad());
+ }
+
+ #[test]
+ fn round_trip() {
+ let sum = Label::sha256(b"dummy data");
+ let meta = ChunkMeta::new(&sum);
+ let chunk = DataChunk::new("hello".as_bytes().to_vec(), meta);
+ let pass = Passwords::new("secret");
+
+ let cipher = CipherEngine::new(&pass);
+ let enc = cipher.encrypt_chunk(&chunk).unwrap();
+
+ let bytes: Vec<u8> = enc.ciphertext().to_vec();
+ let dec = cipher.decrypt_chunk(&bytes, enc.aad()).unwrap();
+ assert_eq!(chunk, dec);
+ }
+
+ #[test]
+ fn decrypt_errors_if_nonce_is_too_short() {
+ let pass = Passwords::new("our little test secret");
+ let e = CipherEngine::new(&pass);
+
+ // *Almost* a valid chunk header, except it's one byte too short
+ let bytes = {
+ let mut result = [0; CHUNK_V1.len() + NONCE_SIZE - 1];
+ for (i, x) in CHUNK_V1.iter().enumerate() {
+ result[i] = *x;
+ }
+ result
+ };
+
+ let meta = [0; 0];
+
+ assert!(matches!(
+ e.decrypt_chunk(&bytes, &meta),
+ Err(CipherError::NoNonce)
+ ));
+ }
+}
diff --git a/src/client.rs b/src/client.rs
index 515b8c9..a924052 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,271 +1,207 @@
-use crate::checksummer::sha256;
-use crate::chunk::DataChunk;
-use crate::chunk::GenerationChunk;
-use crate::chunker::Chunker;
+//! Client to the Obnam server HTTP API.
+
+use crate::chunk::{
+ ClientTrust, ClientTrustError, DataChunk, GenerationChunk, GenerationChunkError,
+};
use crate::chunkid::ChunkId;
use crate::chunkmeta::ChunkMeta;
-use crate::error::ObnamError;
-use crate::fsentry::{FilesystemEntry, FilesystemKind};
-use crate::generation::{FinishedGeneration, LocalGeneration};
+use crate::chunkstore::{ChunkStore, StoreError};
+use crate::cipher::{CipherEngine, CipherError};
+use crate::config::{ClientConfig, ClientConfigError};
+use crate::generation::{FinishedGeneration, GenId, LocalGeneration, LocalGenerationError};
use crate::genlist::GenerationList;
+use crate::label::Label;
-use anyhow::Context;
-use chrono::{DateTime, Local};
-use log::{debug, error, info, trace};
-use reqwest::blocking::Client;
-use serde::Deserialize;
-use std::collections::HashMap;
+use log::{error, info};
use std::fs::File;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
-#[derive(Debug, Deserialize, Clone)]
-pub struct ClientConfig {
- pub server_url: String,
- pub root: PathBuf,
- pub log: Option<PathBuf>,
-}
-
-impl ClientConfig {
- pub fn read_config(filename: &Path) -> anyhow::Result<Self> {
- trace!("read_config: filename={:?}", filename);
- let config = std::fs::read_to_string(filename)
- .with_context(|| format!("reading configuration file {}", filename.display()))?;
- let config = serde_yaml::from_str(&config)?;
- Ok(config)
- }
-}
-
+/// Possible errors when using the server API.
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
- #[error("Server successful response to creating chunk lacked chunk id")]
+ /// No chunk id for uploaded chunk.
+ #[error("Server response claimed it had created a chunk, but lacked chunk id")]
NoCreatedChunkId,
+ /// Server claims to not have an entity.
+ #[error("Server does not have {0}")]
+ NotFound(String),
+
+ /// Server does not have a chunk.
#[error("Server does not have chunk {0}")]
- ChunkNotFound(String),
+ ChunkNotFound(ChunkId),
+ /// Server does not have generation.
#[error("Server does not have generation {0}")]
- GenerationNotFound(String),
-}
+ GenerationNotFound(ChunkId),
-pub struct BackupClient {
- client: Client,
- base_url: String,
-}
+ /// Server didn't give us a chunk's metadata.
+ #[error("Server response did not have a 'chunk-meta' header for chunk {0}")]
+ NoChunkMeta(ChunkId),
-impl BackupClient {
- pub fn new(base_url: &str) -> anyhow::Result<Self> {
- let client = Client::builder()
- .danger_accept_invalid_certs(true)
- .build()?;
- Ok(Self {
- client,
- base_url: base_url.to_string(),
- })
- }
+ /// Chunk has wrong checksum and may be corrupted.
+ #[error("Wrong checksum for chunk {0}, got {1}, expected {2}")]
+ WrongChecksum(ChunkId, String, String),
- pub fn upload_filesystem_entry(
- &self,
- e: &FilesystemEntry,
- size: usize,
- ) -> anyhow::Result<Vec<ChunkId>> {
- info!("upload entry: {:?}", e);
- let ids = match e.kind() {
- FilesystemKind::Regular => self.read_file(e.pathbuf(), size)?,
- FilesystemKind::Directory => vec![],
- FilesystemKind::Symlink => vec![],
- };
- Ok(ids)
- }
+ /// Client configuration is wrong.
+ #[error(transparent)]
+ ClientConfigError(#[from] ClientConfigError),
- pub fn upload_generation(&self, filename: &Path, size: usize) -> anyhow::Result<ChunkId> {
- info!("upload SQLite {}", filename.display());
- let ids = self.read_file(filename.to_path_buf(), size)?;
- let gen = GenerationChunk::new(ids);
- let data = gen.to_data_chunk()?;
- let meta = ChunkMeta::new_generation(&sha256(data.data()), &current_timestamp());
- let gen_id = self.upload_gen_chunk(meta.clone(), gen)?;
- info!("uploaded generation {}, meta {:?}", gen_id, meta);
- Ok(gen_id)
- }
+ /// An error encrypting or decrypting chunks.
+ #[error(transparent)]
+ CipherError(#[from] CipherError),
- fn read_file(&self, filename: PathBuf, size: usize) -> anyhow::Result<Vec<ChunkId>> {
- info!("upload file {}", filename.display());
- let file = std::fs::File::open(filename)?;
- let chunker = Chunker::new(size, file);
- let chunk_ids = self.upload_new_file_chunks(chunker)?;
- Ok(chunk_ids)
- }
+ /// An error regarding generation chunks.
+ #[error(transparent)]
+ GenerationChunkError(#[from] GenerationChunkError),
- fn base_url(&self) -> &str {
- &self.base_url
- }
+ /// An error regarding client trust.
+ #[error(transparent)]
+ ClientTrust(#[from] ClientTrustError),
- fn chunks_url(&self) -> String {
- format!("{}/chunks", self.base_url())
- }
+ /// An error using a backup's local metadata.
+ #[error(transparent)]
+ LocalGenerationError(#[from] LocalGenerationError),
- pub fn has_chunk(&self, meta: &ChunkMeta) -> anyhow::Result<Option<ChunkId>> {
- trace!("has_chunk: url={:?}", self.base_url());
- let req = self
- .client
- .get(&self.chunks_url())
- .query(&[("sha256", meta.sha256())])
- .build()?;
-
- let res = self.client.execute(req)?;
- debug!("has_chunk: status={}", res.status());
- let has = if res.status() != 200 {
- debug!("has_chunk: error from server");
- None
- } else {
- let text = res.text()?;
- debug!("has_chunk: text={:?}", text);
- let hits: HashMap<String, ChunkMeta> = serde_json::from_str(&text)?;
- debug!("has_chunk: hits={:?}", hits);
- let mut iter = hits.iter();
- if let Some((chunk_id, _)) = iter.next() {
- debug!("has_chunk: chunk_id={:?}", chunk_id);
- Some(chunk_id.into())
- } else {
- None
- }
- };
+ /// An error with the `chunk-meta` header.
+ #[error("couldn't convert response chunk-meta header to string: {0}")]
+ MetaHeaderToString(reqwest::header::ToStrError),
+
+ /// An error from the HTTP library.
+ #[error("error from reqwest library: {0}")]
+ ReqwestError(reqwest::Error),
+
+ /// Couldn't look up a chunk via checksum.
+ #[error("lookup by chunk checksum failed: {0}")]
+ ChunkExists(reqwest::Error),
+
+ /// Error parsing JSON.
+ #[error("failed to parse JSON: {0}")]
+ JsonParse(serde_json::Error),
- info!("has_chunk result: {:?}", has);
- Ok(has)
+ /// Error generating JSON.
+ #[error("failed to generate JSON: {0}")]
+ JsonGenerate(serde_json::Error),
+
+ /// Error parsing YAML.
+ #[error("failed to parse YAML: {0}")]
+ YamlParse(serde_yaml::Error),
+
+ /// Failed to open a file.
+ #[error("failed to open file {0}: {1}")]
+ FileOpen(PathBuf, std::io::Error),
+
+ /// Failed to create a file.
+ #[error("failed to create file {0}: {1}")]
+ FileCreate(PathBuf, std::io::Error),
+
+ /// Failed to write a file.
+ #[error("failed to write to file {0}: {1}")]
+ FileWrite(PathBuf, std::io::Error),
+
+ /// Error from a chunk store.
+ #[error(transparent)]
+ ChunkStore(#[from] StoreError),
+}
+
+/// Client for the Obnam server HTTP API.
+pub struct BackupClient {
+ store: ChunkStore,
+ cipher: CipherEngine,
+}
+
+impl BackupClient {
+ /// Create a new backup client.
+ pub fn new(config: &ClientConfig) -> Result<Self, ClientError> {
+ info!("creating backup client with config: {:#?}", config);
+ let pass = config.passwords()?;
+ Ok(Self {
+ store: ChunkStore::remote(config)?,
+ cipher: CipherEngine::new(&pass),
+ })
}
- pub fn upload_chunk(&self, meta: ChunkMeta, chunk: DataChunk) -> anyhow::Result<ChunkId> {
- let res = self
- .client
- .post(&self.chunks_url())
- .header("chunk-meta", meta.to_json())
- .body(chunk.data().to_vec())
- .send()?;
- debug!("upload_chunk: res={:?}", res);
- let res: HashMap<String, String> = res.json()?;
- let chunk_id = if let Some(chunk_id) = res.get("chunk_id") {
- debug!("upload_chunk: id={}", chunk_id);
- chunk_id.parse().unwrap()
- } else {
- return Err(ClientError::NoCreatedChunkId.into());
- };
- info!("uploaded_chunk {} meta {:?}", chunk_id, meta);
- Ok(chunk_id)
+ /// Does the server have a chunk?
+ pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> {
+ let mut ids = self.store.find_by_label(meta).await?;
+ Ok(ids.pop())
}
- pub fn upload_gen_chunk(
- &self,
- meta: ChunkMeta,
- gen: GenerationChunk,
- ) -> anyhow::Result<ChunkId> {
- let res = self
- .client
- .post(&self.chunks_url())
- .header("chunk-meta", meta.to_json())
- .body(serde_json::to_string(&gen)?)
- .send()?;
- debug!("upload_chunk: res={:?}", res);
- let res: HashMap<String, String> = res.json()?;
- let chunk_id = if let Some(chunk_id) = res.get("chunk_id") {
- debug!("upload_chunk: id={}", chunk_id);
- chunk_id.parse().unwrap()
- } else {
- return Err(ClientError::NoCreatedChunkId.into());
- };
- info!("uploaded_generation chunk {}", chunk_id);
- Ok(chunk_id)
+ /// Upload a data chunk to the server.
+ pub async fn upload_chunk(&mut self, chunk: DataChunk) -> Result<ChunkId, ClientError> {
+ let enc = self.cipher.encrypt_chunk(&chunk)?;
+ let data = enc.ciphertext().to_vec();
+ let id = self.store.put(data, chunk.meta()).await?;
+ Ok(id)
}
- pub fn upload_new_file_chunks(&self, chunker: Chunker) -> anyhow::Result<Vec<ChunkId>> {
- let mut chunk_ids = vec![];
- for item in chunker {
- let (meta, chunk) = item?;
- if let Some(chunk_id) = self.has_chunk(&meta)? {
- chunk_ids.push(chunk_id.clone());
- info!("reusing existing chunk {}", chunk_id);
+ /// Get current client trust chunk from repository, if there is one.
+ pub async fn get_client_trust(&self) -> Result<Option<ClientTrust>, ClientError> {
+ let ids = self.find_client_trusts().await?;
+ let mut latest: Option<ClientTrust> = None;
+ for id in ids {
+ let chunk = self.fetch_chunk(&id).await?;
+ let new = ClientTrust::from_data_chunk(&chunk)?;
+ if let Some(t) = &latest {
+ if new.timestamp() > t.timestamp() {
+ latest = Some(new);
+ }
} else {
- let chunk_id = self.upload_chunk(meta, chunk)?;
- chunk_ids.push(chunk_id.clone());
- info!("created new chunk {}", chunk_id);
+ latest = Some(new);
}
}
+ Ok(latest)
+ }
- Ok(chunk_ids)
+ async fn find_client_trusts(&self) -> Result<Vec<ChunkId>, ClientError> {
+ let label = Label::literal("client-trust");
+ let meta = ChunkMeta::new(&label);
+ let ids = self.store.find_by_label(&meta).await?;
+ Ok(ids)
}
- pub fn list_generations(&self) -> anyhow::Result<GenerationList> {
- let url = format!("{}?generation=true", &self.chunks_url());
- trace!("list_generations: url={:?}", url);
- let req = self.client.get(&url).build()?;
- let res = self.client.execute(req)?;
- debug!("list_generations: status={}", res.status());
- let body = res.bytes()?;
- debug!("list_generations: body={:?}", body);
- let map: HashMap<String, ChunkMeta> = serde_yaml::from_slice(&body)?;
- debug!("list_generations: map={:?}", map);
- let finished = map
+ /// List backup generations known by the server.
+ pub fn list_generations(&self, trust: &ClientTrust) -> GenerationList {
+ let finished = trust
+ .backups()
.iter()
- .map(|(id, meta)| FinishedGeneration::new(id, meta.ended().map_or("", |s| s)))
+ .map(|id| FinishedGeneration::new(&format!("{}", id), ""))
.collect();
- Ok(GenerationList::new(finished))
+ GenerationList::new(finished)
}
- pub fn fetch_chunk(&self, chunk_id: &ChunkId) -> anyhow::Result<DataChunk> {
- info!("fetch chunk {}", chunk_id);
-
- let url = format!("{}/{}", &self.chunks_url(), chunk_id);
- let req = self.client.get(&url).build()?;
- let res = self.client.execute(req)?;
- if res.status() != 200 {
- let err = ClientError::ChunkNotFound(chunk_id.to_string());
- error!("fetching chunk {} failed: {}", chunk_id, err);
- return Err(err.into());
- }
-
- let headers = res.headers();
- let meta = headers.get("chunk-meta");
- if meta.is_none() {
- let err = ObnamError::NoChunkMeta(chunk_id.clone());
- error!("fetching chunk {} failed: {}", chunk_id, err);
- return Err(err.into());
- }
- let meta = meta.unwrap().to_str()?;
- debug!("fetching chunk {}: meta={:?}", chunk_id, meta);
- let meta: ChunkMeta = serde_json::from_str(meta)?;
- debug!("fetching chunk {}: meta={:?}", chunk_id, meta);
-
- let body = res.bytes()?;
- let body = body.to_vec();
- let actual = sha256(&body);
- if actual != meta.sha256() {
- let err =
- ObnamError::WrongChecksum(chunk_id.clone(), actual, meta.sha256().to_string());
- error!("fetching chunk {} failed: {}", chunk_id, err);
- return Err(err.into());
- }
-
- let chunk: DataChunk = DataChunk::new(body);
+ /// Fetch a data chunk from the server, given the chunk identifier.
+ pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
+ let (body, meta) = self.store.get(chunk_id).await?;
+ let meta_bytes = meta.to_json_vec();
+ let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?;
Ok(chunk)
}
- fn fetch_generation_chunk(&self, gen_id: &str) -> anyhow::Result<GenerationChunk> {
- let chunk_id = ChunkId::from_str(gen_id);
- let chunk = self.fetch_chunk(&chunk_id)?;
+ async fn fetch_generation_chunk(&self, gen_id: &GenId) -> Result<GenerationChunk, ClientError> {
+ let chunk = self.fetch_chunk(gen_id.as_chunk_id()).await?;
let gen = GenerationChunk::from_data_chunk(&chunk)?;
Ok(gen)
}
- pub fn fetch_generation(&self, gen_id: &str, dbname: &Path) -> anyhow::Result<LocalGeneration> {
- let gen = self.fetch_generation_chunk(gen_id)?;
+ /// Fetch a backup generation's metadata, given it's identifier.
+ pub async fn fetch_generation(
+ &self,
+ gen_id: &GenId,
+ dbname: &Path,
+ ) -> Result<LocalGeneration, ClientError> {
+ let gen = self.fetch_generation_chunk(gen_id).await?;
// Fetch the SQLite file, storing it in the named file.
- let mut dbfile = File::create(&dbname)?;
+ let mut dbfile = File::create(dbname)
+ .map_err(|err| ClientError::FileCreate(dbname.to_path_buf(), err))?;
for id in gen.chunk_ids() {
- let chunk = self.fetch_chunk(id)?;
- dbfile.write_all(chunk.data())?;
+ let chunk = self.fetch_chunk(id).await?;
+ dbfile
+ .write_all(chunk.data())
+ .map_err(|err| ClientError::FileWrite(dbname.to_path_buf(), err))?;
}
info!("downloaded generation to {}", dbname.display());
@@ -273,8 +209,3 @@ impl BackupClient {
Ok(gen)
}
}
-
-fn current_timestamp() -> String {
- let now: DateTime<Local> = Local::now();
- format!("{}", now.format("%Y-%m-%d %H:%M:%S.%f %z"))
-}
diff --git a/src/cmd/backup.rs b/src/cmd/backup.rs
index da7298f..70e9eac 100644
--- a/src/cmd/backup.rs
+++ b/src/cmd/backup.rs
@@ -1,65 +1,137 @@
-use crate::backup_run::BackupRun;
-use crate::client::ClientConfig;
-use crate::fsiter::FsIterator;
-use crate::generation::NascentGeneration;
+//! The `backup` subcommand.
+
+use crate::backup_run::{current_timestamp, BackupRun};
+use crate::chunk::ClientTrust;
+use crate::client::BackupClient;
+use crate::config::ClientConfig;
+use crate::dbgen::{schema_version, FileId, DEFAULT_SCHEMA_MAJOR};
+use crate::error::ObnamError;
+use crate::generation::GenId;
+use crate::performance::{Clock, Performance};
+use crate::schema::VersionComponent;
+
+use clap::Parser;
use log::info;
use std::time::SystemTime;
-use tempfile::NamedTempFile;
-
-pub fn backup(config: &ClientConfig, buffer_size: usize) -> anyhow::Result<()> {
- let runtime = SystemTime::now();
-
- let run = BackupRun::new(config, buffer_size)?;
-
- // Create a named temporary file. We don't meed the open file
- // handle, so we discard that.
- let oldname = {
- let temp = NamedTempFile::new()?;
- let (_, dbname) = temp.keep()?;
- dbname
- };
-
- // Create a named temporary file. We don't meed the open file
- // handle, so we discard that.
- let newname = {
- let temp = NamedTempFile::new()?;
- let (_, dbname) = temp.keep()?;
- dbname
- };
-
- let genlist = run.client().list_generations()?;
- let file_count = {
- let iter = FsIterator::new(&config.root);
- let mut new = NascentGeneration::create(&newname)?;
-
- match genlist.resolve("latest") {
- None => {
- info!("fresh backup without a previous generation");
- new.insert_iter(iter.map(|entry| run.backup_file_initially(entry)))?;
+use tempfile::tempdir;
+use tokio::runtime::Runtime;
+
+/// Make a backup.
+#[derive(Debug, Parser)]
+pub struct Backup {
+ /// Force a full backup, instead of an incremental one.
+ #[clap(long)]
+ full: bool,
+
+ /// Backup schema major version to use.
+ #[clap(long)]
+ backup_version: Option<VersionComponent>,
+}
+
+impl Backup {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig, perf: &mut Performance) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config, perf))
+ }
+
+ async fn run_async(
+ &self,
+ config: &ClientConfig,
+ perf: &mut Performance,
+ ) -> Result<(), ObnamError> {
+ let runtime = SystemTime::now();
+
+ let major = self.backup_version.unwrap_or(DEFAULT_SCHEMA_MAJOR);
+ let schema = schema_version(major)?;
+
+ let mut client = BackupClient::new(config)?;
+ let trust = client
+ .get_client_trust()
+ .await?
+ .or_else(|| Some(ClientTrust::new("FIXME", None, current_timestamp(), vec![])))
+ .unwrap();
+ let genlist = client.list_generations(&trust);
+
+ let temp = tempdir()?;
+ let oldtemp = temp.path().join("old.db");
+ let newtemp = temp.path().join("new.db");
+
+ let old_id = if self.full {
+ None
+ } else {
+ match genlist.resolve("latest") {
+ Err(_) => None,
+ Ok(old_id) => Some(old_id),
}
- Some(old) => {
- info!("incremental backup based on {}", old);
- let old = run.client().fetch_generation(&old, &oldname)?;
- run.progress()
- .files_in_previous_generation(old.file_count()? as u64);
- new.insert_iter(iter.map(|entry| run.backup_file_incrementally(entry, &old)))?;
+ };
+
+ let (is_incremental, outcome) = if let Some(old_id) = old_id {
+ info!("incremental backup based on {}", old_id);
+ let mut run = BackupRun::incremental(config, &mut client)?;
+ let old = run.start(Some(&old_id), &oldtemp, perf).await?;
+ (
+ true,
+ run.backup_roots(config, &old, &newtemp, schema, perf)
+ .await?,
+ )
+ } else {
+ info!("fresh backup without a previous generation");
+ let mut run = BackupRun::initial(config, &mut client)?;
+ let old = run.start(None, &oldtemp, perf).await?;
+ (
+ false,
+ run.backup_roots(config, &old, &newtemp, schema, perf)
+ .await?,
+ )
+ };
+
+ perf.start(Clock::GenerationUpload);
+ let mut trust = trust;
+ trust.append_backup(outcome.gen_id.as_chunk_id());
+ trust.finalize(current_timestamp());
+ let trust = trust.to_data_chunk()?;
+ let trust_id = client.upload_chunk(trust).await?;
+ perf.stop(Clock::GenerationUpload);
+ info!("uploaded new client-trust {}", trust_id);
+
+ for w in outcome.warnings.iter() {
+ println!("warning: {}", w);
+ }
+
+ if is_incremental && !outcome.new_cachedir_tags.is_empty() {
+ println!("New CACHEDIR.TAG files since the last backup:");
+ for t in &outcome.new_cachedir_tags {
+ println!("- {:?}", t);
}
+ println!("You can configure Obnam to ignore all such files by setting `exclude_cache_tag_directories` to `false`.");
}
- run.progress().finish();
- new.file_count()
- };
- // Upload the SQLite file, i.e., the named temporary file, which
- // still exists, since we persisted it above.
- let gen_id = run.client().upload_generation(&newname, buffer_size)?;
+ report_stats(
+ &runtime,
+ outcome.files_count,
+ &outcome.gen_id,
+ outcome.warnings.len(),
+ )?;
+
+ if is_incremental && !outcome.new_cachedir_tags.is_empty() {
+ Err(ObnamError::NewCachedirTagsFound)
+ } else {
+ Ok(())
+ }
+ }
+}
+
+fn report_stats(
+ runtime: &SystemTime,
+ file_count: FileId,
+ gen_id: &GenId,
+ num_warnings: usize,
+) -> Result<(), ObnamError> {
println!("status: OK");
+ println!("warnings: {}", num_warnings);
println!("duration: {}", runtime.elapsed()?.as_secs());
println!("file-count: {}", file_count);
println!("generation-id: {}", gen_id);
-
- // Delete the temporary file.q
- std::fs::remove_file(&newname)?;
- std::fs::remove_file(&oldname)?;
-
Ok(())
}
diff --git a/src/cmd/chunk.rs b/src/cmd/chunk.rs
new file mode 100644
index 0000000..293de20
--- /dev/null
+++ b/src/cmd/chunk.rs
@@ -0,0 +1,70 @@
+//! The `encrypt-chunk` and `decrypt-chunk` subcommands.
+
+use crate::chunk::DataChunk;
+use crate::chunkmeta::ChunkMeta;
+use crate::cipher::CipherEngine;
+use crate::config::ClientConfig;
+use crate::error::ObnamError;
+use clap::Parser;
+use std::path::PathBuf;
+
+/// Encrypt a chunk.
+#[derive(Debug, Parser)]
+pub struct EncryptChunk {
+ /// The name of the file containing the cleartext chunk.
+ filename: PathBuf,
+
+ /// Name of file where to write the encrypted chunk.
+ output: PathBuf,
+
+ /// Chunk metadata as JSON.
+ json: String,
+}
+
+impl EncryptChunk {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let pass = config.passwords()?;
+ let cipher = CipherEngine::new(&pass);
+
+ let meta = ChunkMeta::from_json(&self.json)?;
+
+ let cleartext = std::fs::read(&self.filename)?;
+ let chunk = DataChunk::new(cleartext, meta);
+ let encrypted = cipher.encrypt_chunk(&chunk)?;
+
+ std::fs::write(&self.output, encrypted.ciphertext())?;
+
+ Ok(())
+ }
+}
+
+/// Decrypt a chunk.
+#[derive(Debug, Parser)]
+pub struct DecryptChunk {
+ /// Name of file containing encrypted chunk.
+ filename: PathBuf,
+
+ /// Name of file where to write the cleartext chunk.
+ output: PathBuf,
+
+ /// Chunk metadata as JSON.
+ json: String,
+}
+
+impl DecryptChunk {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let pass = config.passwords()?;
+ let cipher = CipherEngine::new(&pass);
+
+ let meta = ChunkMeta::from_json(&self.json)?;
+
+ let encrypted = std::fs::read(&self.filename)?;
+ let chunk = cipher.decrypt_chunk(&encrypted, &meta.to_json_vec())?;
+
+ std::fs::write(&self.output, chunk.data())?;
+
+ Ok(())
+ }
+}
diff --git a/src/cmd/chunkify.rs b/src/cmd/chunkify.rs
new file mode 100644
index 0000000..91cb0be
--- /dev/null
+++ b/src/cmd/chunkify.rs
@@ -0,0 +1,110 @@
+//! The `chunkify` subcommand.
+
+use crate::config::ClientConfig;
+use crate::engine::Engine;
+use crate::error::ObnamError;
+use crate::workqueue::WorkQueue;
+use clap::Parser;
+use serde::Serialize;
+use sha2::{Digest, Sha256};
+use std::path::PathBuf;
+use tokio::fs::File;
+use tokio::io::{AsyncReadExt, BufReader};
+use tokio::runtime::Runtime;
+use tokio::sync::mpsc;
+
+// Size of queue with unprocessed chunks, and also queue of computed
+// checksums.
+const Q: usize = 8;
+
+/// Split files into chunks and show their metadata.
+#[derive(Debug, Parser)]
+pub struct Chunkify {
+ /// Names of files to split into chunks.
+ filenames: Vec<PathBuf>,
+}
+
+impl Chunkify {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
+
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let mut q = WorkQueue::new(Q);
+ for filename in self.filenames.iter() {
+ tokio::spawn(split_file(
+ filename.to_path_buf(),
+ config.chunk_size,
+ q.push(),
+ ));
+ }
+ q.close();
+
+ let mut summer = Engine::new(q, just_hash);
+
+ let mut checksums = vec![];
+ while let Some(sum) = summer.next().await {
+ checksums.push(sum);
+ }
+
+ println!("{}", serde_json::to_string_pretty(&checksums)?);
+
+ Ok(())
+ }
+}
+
+#[derive(Debug, Clone)]
+struct Chunk {
+ filename: PathBuf,
+ offset: u64,
+ data: Vec<u8>,
+}
+
+#[derive(Debug, Clone, Serialize)]
+struct Checksum {
+ filename: PathBuf,
+ offset: u64,
+ pub len: u64,
+ checksum: String,
+}
+
+async fn split_file(filename: PathBuf, chunk_size: usize, tx: mpsc::Sender<Chunk>) {
+ // println!("split_file {}", filename.display());
+ let mut file = BufReader::new(File::open(&*filename).await.unwrap());
+
+ let mut offset = 0;
+ loop {
+ let mut data = vec![0; chunk_size];
+ let n = file.read(&mut data).await.unwrap();
+ if n == 0 {
+ break;
+ }
+ let data: Vec<u8> = data[..n].to_vec();
+
+ let chunk = Chunk {
+ filename: filename.clone(),
+ offset,
+ data,
+ };
+ tx.send(chunk).await.unwrap();
+ // println!("split_file sent chunk at offset {}", offset);
+
+ offset += n as u64;
+ }
+ // println!("split_file EOF at {}", offset);
+}
+
+fn just_hash(chunk: Chunk) -> Checksum {
+ let mut hasher = Sha256::new();
+ hasher.update(&chunk.data);
+ let hash = hasher.finalize();
+ let hash = format!("{:x}", hash);
+ Checksum {
+ filename: chunk.filename,
+ offset: chunk.offset,
+ len: chunk.data.len() as u64,
+ checksum: hash,
+ }
+}
diff --git a/src/cmd/gen_info.rs b/src/cmd/gen_info.rs
new file mode 100644
index 0000000..901a0ae
--- /dev/null
+++ b/src/cmd/gen_info.rs
@@ -0,0 +1,47 @@
+//! The `gen-info` subcommand.
+
+use crate::chunk::ClientTrust;
+use crate::client::BackupClient;
+use crate::config::ClientConfig;
+use crate::error::ObnamError;
+use clap::Parser;
+use log::info;
+use tempfile::NamedTempFile;
+use tokio::runtime::Runtime;
+
+/// Show metadata for a generation.
+#[derive(Debug, Parser)]
+pub struct GenInfo {
+ /// Reference of the generation.
+ gen_ref: String,
+}
+
+impl GenInfo {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
+
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let temp = NamedTempFile::new()?;
+
+ let client = BackupClient::new(config)?;
+
+ let trust = client
+ .get_client_trust()
+ .await?
+ .or_else(|| Some(ClientTrust::new("FIXME", None, "".to_string(), vec![])))
+ .unwrap();
+
+ let genlist = client.list_generations(&trust);
+ let gen_id = genlist.resolve(&self.gen_ref)?;
+ info!("generation id is {}", gen_id.as_chunk_id());
+
+ let gen = client.fetch_generation(&gen_id, temp.path()).await?;
+ let meta = gen.meta()?;
+ println!("{}", serde_json::to_string_pretty(&meta)?);
+
+ Ok(())
+ }
+}
diff --git a/src/cmd/get_chunk.rs b/src/cmd/get_chunk.rs
index bf653ff..1561492 100644
--- a/src/cmd/get_chunk.rs
+++ b/src/cmd/get_chunk.rs
@@ -1,15 +1,34 @@
+//! The `get-chunk` subcommand.
+
use crate::chunkid::ChunkId;
use crate::client::BackupClient;
-use crate::client::ClientConfig;
+use crate::config::ClientConfig;
+use crate::error::ObnamError;
+use clap::Parser;
use std::io::{stdout, Write};
+use tokio::runtime::Runtime;
+
+/// Fetch a chunk from the server.
+#[derive(Debug, Parser)]
+pub struct GetChunk {
+ /// Identifier of chunk to fetch.
+ chunk_id: String,
+}
-pub fn get_chunk(config: &ClientConfig, chunk_id: &str) -> anyhow::Result<()> {
- let client = BackupClient::new(&config.server_url)?;
- let chunk_id: ChunkId = chunk_id.parse().unwrap();
- let chunk = client.fetch_chunk(&chunk_id)?;
+impl GetChunk {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
- let stdout = stdout();
- let mut handle = stdout.lock();
- handle.write_all(chunk.data())?;
- Ok(())
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let client = BackupClient::new(config)?;
+ let chunk_id: ChunkId = self.chunk_id.parse().unwrap();
+ let chunk = client.fetch_chunk(&chunk_id).await?;
+ let stdout = stdout();
+ let mut handle = stdout.lock();
+ handle.write_all(chunk.data())?;
+ Ok(())
+ }
}
diff --git a/src/cmd/init.rs b/src/cmd/init.rs
new file mode 100644
index 0000000..5950fbb
--- /dev/null
+++ b/src/cmd/init.rs
@@ -0,0 +1,33 @@
+//! The `init` subcommand.
+
+use crate::config::ClientConfig;
+use crate::error::ObnamError;
+use crate::passwords::{passwords_filename, Passwords};
+use clap::Parser;
+
+const PROMPT: &str = "Obnam passphrase: ";
+
+/// Initialize client by setting passwords.
+#[derive(Debug, Parser)]
+pub struct Init {
+ /// Only for testing.
+ #[clap(long)]
+ insecure_passphrase: Option<String>,
+}
+
+impl Init {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let passphrase = match &self.insecure_passphrase {
+ Some(x) => x.to_string(),
+ None => rpassword::prompt_password(PROMPT).unwrap(),
+ };
+
+ let passwords = Passwords::new(&passphrase);
+ let filename = passwords_filename(&config.filename);
+ passwords
+ .save(&filename)
+ .map_err(|err| ObnamError::PasswordSave(filename, err))?;
+ Ok(())
+ }
+}
diff --git a/src/cmd/inspect.rs b/src/cmd/inspect.rs
new file mode 100644
index 0000000..3b41075
--- /dev/null
+++ b/src/cmd/inspect.rs
@@ -0,0 +1,46 @@
+//! The `inspect` subcommand.
+
+use crate::backup_run::current_timestamp;
+use crate::chunk::ClientTrust;
+use crate::client::BackupClient;
+use crate::config::ClientConfig;
+use crate::error::ObnamError;
+
+use clap::Parser;
+use log::info;
+use tempfile::NamedTempFile;
+use tokio::runtime::Runtime;
+
+/// Make a backup.
+#[derive(Debug, Parser)]
+pub struct Inspect {
+ /// Reference to generation to inspect.
+ gen_id: String,
+}
+
+impl Inspect {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
+
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let temp = NamedTempFile::new()?;
+ let client = BackupClient::new(config)?;
+ let trust = client
+ .get_client_trust()
+ .await?
+ .or_else(|| Some(ClientTrust::new("FIXME", None, current_timestamp(), vec![])))
+ .unwrap();
+ let genlist = client.list_generations(&trust);
+ let gen_id = genlist.resolve(&self.gen_id)?;
+ info!("generation id is {}", gen_id.as_chunk_id());
+
+ let gen = client.fetch_generation(&gen_id, temp.path()).await?;
+ let meta = gen.meta()?;
+ println!("schema_version: {}", meta.schema_version());
+
+ Ok(())
+ }
+}
diff --git a/src/cmd/list.rs b/src/cmd/list.rs
index 8766e34..8bc6978 100644
--- a/src/cmd/list.rs
+++ b/src/cmd/list.rs
@@ -1,12 +1,36 @@
-use crate::client::{BackupClient, ClientConfig};
+//! The `list` subcommand.
-pub fn list(config: &ClientConfig) -> anyhow::Result<()> {
- let client = BackupClient::new(&config.server_url)?;
+use crate::chunk::ClientTrust;
+use crate::client::BackupClient;
+use crate::config::ClientConfig;
+use crate::error::ObnamError;
+use clap::Parser;
+use tokio::runtime::Runtime;
- let generations = client.list_generations()?;
- for finished in generations.iter() {
- println!("{} {}", finished.id(), finished.ended());
+/// List generations on the server.
+#[derive(Debug, Parser)]
+pub struct List {}
+
+impl List {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
}
- Ok(())
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let client = BackupClient::new(config)?;
+ let trust = client
+ .get_client_trust()
+ .await?
+ .or_else(|| Some(ClientTrust::new("FIXME", None, "".to_string(), vec![])))
+ .unwrap();
+
+ let generations = client.list_generations(&trust);
+ for finished in generations.iter() {
+ println!("{} {}", finished.id(), finished.ended());
+ }
+
+ Ok(())
+ }
}
diff --git a/src/cmd/list_backup_versions.rs b/src/cmd/list_backup_versions.rs
new file mode 100644
index 0000000..c78ccfc
--- /dev/null
+++ b/src/cmd/list_backup_versions.rs
@@ -0,0 +1,31 @@
+//! The `backup` subcommand.
+
+use crate::config::ClientConfig;
+use crate::dbgen::{schema_version, DEFAULT_SCHEMA_MAJOR, SCHEMA_MAJORS};
+use crate::error::ObnamError;
+
+use clap::Parser;
+
+/// List supported backup schema versions.
+#[derive(Debug, Parser)]
+pub struct ListSchemaVersions {
+ /// List only the default version.
+ #[clap(long)]
+ default_only: bool,
+}
+
+impl ListSchemaVersions {
+ /// Run the command.
+ pub fn run(&self, _config: &ClientConfig) -> Result<(), ObnamError> {
+ if self.default_only {
+ let schema = schema_version(DEFAULT_SCHEMA_MAJOR)?;
+ println!("{}", schema);
+ } else {
+ for major in SCHEMA_MAJORS {
+ let schema = schema_version(*major)?;
+ println!("{}", schema);
+ }
+ }
+ Ok(())
+ }
+}
diff --git a/src/cmd/list_files.rs b/src/cmd/list_files.rs
index a69c3df..e8276cd 100644
--- a/src/cmd/list_files.rs
+++ b/src/cmd/list_files.rs
@@ -1,36 +1,51 @@
+//! The `list-files` subcommand.
+
use crate::backup_reason::Reason;
+use crate::chunk::ClientTrust;
use crate::client::BackupClient;
-use crate::client::ClientConfig;
+use crate::config::ClientConfig;
use crate::error::ObnamError;
use crate::fsentry::{FilesystemEntry, FilesystemKind};
+use clap::Parser;
use tempfile::NamedTempFile;
+use tokio::runtime::Runtime;
-pub fn list_files(config: &ClientConfig, gen_ref: &str) -> anyhow::Result<()> {
- // Create a named temporary file. We don't meed the open file
- // handle, so we discard that.
- let dbname = {
- let temp = NamedTempFile::new()?;
- let (_, dbname) = temp.keep()?;
- dbname
- };
+/// List files in a backup.
+#[derive(Debug, Parser)]
+pub struct ListFiles {
+ /// Reference to backup to list files in.
+ #[clap(default_value = "latest")]
+ gen_id: String,
+}
+
+impl ListFiles {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
- let client = BackupClient::new(&config.server_url)?;
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let temp = NamedTempFile::new()?;
- let genlist = client.list_generations()?;
- let gen_id: String = match genlist.resolve(gen_ref) {
- None => return Err(ObnamError::UnknownGeneration(gen_ref.to_string()).into()),
- Some(id) => id,
- };
+ let client = BackupClient::new(config)?;
+ let trust = client
+ .get_client_trust()
+ .await?
+ .or_else(|| Some(ClientTrust::new("FIXME", None, "".to_string(), vec![])))
+ .unwrap();
- let gen = client.fetch_generation(&gen_id, &dbname)?;
- for file in gen.files()? {
- println!("{}", format_entry(&file.entry(), file.reason()));
- }
+ let genlist = client.list_generations(&trust);
+ let gen_id = genlist.resolve(&self.gen_id)?;
- // Delete the temporary file.
- std::fs::remove_file(&dbname)?;
+ let gen = client.fetch_generation(&gen_id, temp.path()).await?;
+ for file in gen.files()?.iter()? {
+ let (_, entry, reason, _) = file?;
+ println!("{}", format_entry(&entry, reason));
+ }
- Ok(())
+ Ok(())
+ }
}
fn format_entry(e: &FilesystemEntry, reason: Reason) -> String {
@@ -38,6 +53,8 @@ fn format_entry(e: &FilesystemEntry, reason: Reason) -> String {
FilesystemKind::Regular => "-",
FilesystemKind::Directory => "d",
FilesystemKind::Symlink => "l",
+ FilesystemKind::Socket => "s",
+ FilesystemKind::Fifo => "p",
};
format!("{} {} ({})", kind, e.pathbuf().display(), reason)
}
diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs
index 8f08668..af7457b 100644
--- a/src/cmd/mod.rs
+++ b/src/cmd/mod.rs
@@ -1,17 +1,16 @@
-mod backup;
-pub use backup::backup;
-
-mod list;
-pub use list::list;
-
-mod list_files;
-pub use list_files::list_files;
-
-pub mod restore;
-pub use restore::restore;
+//! Subcommand implementations.
+pub mod backup;
+pub mod chunk;
+pub mod chunkify;
+pub mod gen_info;
pub mod get_chunk;
-pub use get_chunk::get_chunk;
-
+pub mod init;
+pub mod inspect;
+pub mod list;
+pub mod list_backup_versions;
+pub mod list_files;
+pub mod resolve;
+pub mod restore;
+pub mod show_config;
pub mod show_gen;
-pub use show_gen::show_generation;
diff --git a/src/cmd/resolve.rs b/src/cmd/resolve.rs
new file mode 100644
index 0000000..a7774d7
--- /dev/null
+++ b/src/cmd/resolve.rs
@@ -0,0 +1,44 @@
+//! The `resolve` subcommand.
+
+use crate::chunk::ClientTrust;
+use crate::client::BackupClient;
+use crate::config::ClientConfig;
+use crate::error::ObnamError;
+use clap::Parser;
+use tokio::runtime::Runtime;
+
+/// Resolve a generation reference into a generation id.
+#[derive(Debug, Parser)]
+pub struct Resolve {
+ /// The generation reference.
+ generation: String,
+}
+
+impl Resolve {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
+
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let client = BackupClient::new(config)?;
+ let trust = client
+ .get_client_trust()
+ .await?
+ .or_else(|| Some(ClientTrust::new("FIXME", None, "".to_string(), vec![])))
+ .unwrap();
+ let generations = client.list_generations(&trust);
+
+ match generations.resolve(&self.generation) {
+ Err(err) => {
+ return Err(err.into());
+ }
+ Ok(gen_id) => {
+ println!("{}", gen_id.as_chunk_id());
+ }
+ };
+
+ Ok(())
+ }
+}
diff --git a/src/cmd/restore.rs b/src/cmd/restore.rs
index d783a70..58caf61 100644
--- a/src/cmd/restore.rs
+++ b/src/cmd/restore.rs
@@ -1,101 +1,165 @@
-use crate::client::BackupClient;
-use crate::client::ClientConfig;
+//! The `restore` subcommand.
+
+use crate::backup_reason::Reason;
+use crate::chunk::ClientTrust;
+use crate::client::{BackupClient, ClientError};
+use crate::config::ClientConfig;
+use crate::db::DatabaseError;
+use crate::dbgen::FileId;
use crate::error::ObnamError;
use crate::fsentry::{FilesystemEntry, FilesystemKind};
-use crate::generation::LocalGeneration;
+use crate::generation::{LocalGeneration, LocalGenerationError};
+use clap::Parser;
use indicatif::{ProgressBar, ProgressStyle};
-use libc::{fchmod, futimens, timespec};
+use libc::{chmod, mkfifo, timespec, utimensat, AT_FDCWD, AT_SYMLINK_NOFOLLOW};
use log::{debug, error, info};
-use std::fs::File;
+use std::ffi::CString;
use std::io::prelude::*;
use std::io::Error;
+use std::os::unix::ffi::OsStrExt;
use std::os::unix::fs::symlink;
-use std::os::unix::io::AsRawFd;
+use std::os::unix::net::UnixListener;
+use std::path::StripPrefixError;
use std::path::{Path, PathBuf};
-use structopt::StructOpt;
use tempfile::NamedTempFile;
+use tokio::runtime::Runtime;
+
+/// Restore a backup.
+#[derive(Debug, Parser)]
+pub struct Restore {
+ /// Reference to generation to restore.
+ gen_id: String,
+
+ /// Path to directory where restored files are written.
+ to: PathBuf,
+}
+
+impl Restore {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
-pub fn restore(config: &ClientConfig, gen_ref: &str, to: &Path) -> anyhow::Result<()> {
- // Create a named temporary file. We don't meed the open file
- // handle, so we discard that.
- let dbname = {
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
let temp = NamedTempFile::new()?;
- let (_, dbname) = temp.keep()?;
- dbname
- };
- let client = BackupClient::new(&config.server_url)?;
+ let client = BackupClient::new(config)?;
+ let trust = client
+ .get_client_trust()
+ .await?
+ .or_else(|| Some(ClientTrust::new("FIXME", None, "".to_string(), vec![])))
+ .unwrap();
- let genlist = client.list_generations()?;
- let gen_id: String = match genlist.resolve(gen_ref) {
- None => return Err(ObnamError::UnknownGeneration(gen_ref.to_string()).into()),
- Some(id) => id,
- };
- info!("generation id is {}", gen_id);
+ let genlist = client.list_generations(&trust);
+ let gen_id = genlist.resolve(&self.gen_id)?;
+ info!("generation id is {}", gen_id.as_chunk_id());
- let gen = client.fetch_generation(&gen_id, &dbname)?;
- info!("restoring {} files", gen.file_count()?);
- let progress = create_progress_bar(gen.file_count()?, true);
- for file in gen.files()? {
- restore_generation(&client, &gen, file.fileno(), file.entry(), &to, &progress)?;
- }
- for file in gen.files()? {
- if file.entry().is_dir() {
- restore_directory_metadata(file.entry(), &to)?;
+ let gen = client.fetch_generation(&gen_id, temp.path()).await?;
+ info!("restoring {} files", gen.file_count()?);
+ let progress = create_progress_bar(gen.file_count()?, true);
+ for file in gen.files()?.iter()? {
+ let (fileno, entry, reason, _) = file?;
+ match reason {
+ Reason::FileError => (),
+ _ => restore_generation(&client, &gen, fileno, &entry, &self.to, &progress).await?,
+ }
+ }
+ for file in gen.files()?.iter()? {
+ let (_, entry, _, _) = file?;
+ if entry.is_dir() {
+ restore_directory_metadata(&entry, &self.to)?;
+ }
}
+ progress.finish();
+
+ Ok(())
}
- progress.finish();
+}
- // Delete the temporary file.
- std::fs::remove_file(&dbname)?;
+/// Possible errors from restoring.
+#[derive(Debug, thiserror::Error)]
+pub enum RestoreError {
+ /// An error using a Database.
+ #[error(transparent)]
+ Database(#[from] DatabaseError),
- Ok(())
-}
+ /// Failed to create a name pipe.
+ #[error("Could not create named pipe (FIFO) {0}")]
+ NamedPipeCreationError(PathBuf),
-#[derive(Debug, StructOpt)]
-#[structopt(name = "obnam-backup", about = "Simplistic backup client")]
-struct Opt {
- #[structopt(parse(from_os_str))]
- config: PathBuf,
+ /// Error from HTTP client.
+ #[error(transparent)]
+ ClientError(#[from] ClientError),
- #[structopt()]
- gen_id: String,
+ /// Error from local generation.
+ #[error(transparent)]
+ LocalGenerationError(#[from] LocalGenerationError),
- #[structopt(parse(from_os_str))]
- dbname: PathBuf,
+ /// Error removing a prefix.
+ #[error(transparent)]
+ StripPrefixError(#[from] StripPrefixError),
- #[structopt(parse(from_os_str))]
- to: PathBuf,
+ /// Error creating a directory.
+ #[error("failed to create directory {0}: {1}")]
+ CreateDirs(PathBuf, std::io::Error),
+
+ /// Error creating a file.
+ #[error("failed to create file {0}: {1}")]
+ CreateFile(PathBuf, std::io::Error),
+
+ /// Error writing a file.
+ #[error("failed to write file {0}: {1}")]
+ WriteFile(PathBuf, std::io::Error),
+
+ /// Error creating a symbolic link.
+ #[error("failed to create symbolic link {0}: {1}")]
+ Symlink(PathBuf, std::io::Error),
+
+ /// Error creating a UNIX domain socket.
+ #[error("failed to create UNIX domain socket {0}: {1}")]
+ UnixBind(PathBuf, std::io::Error),
+
+ /// Error setting permissions.
+ #[error("failed to set permissions for {0}: {1}")]
+ Chmod(PathBuf, std::io::Error),
+
+ /// Error settting timestamp.
+ #[error("failed to set timestamp for {0}: {1}")]
+ SetTimestamp(PathBuf, std::io::Error),
}
-fn restore_generation(
+async fn restore_generation(
client: &BackupClient,
gen: &LocalGeneration,
- fileid: i64,
+ fileid: FileId,
entry: &FilesystemEntry,
to: &Path,
progress: &ProgressBar,
-) -> anyhow::Result<()> {
+) -> Result<(), RestoreError> {
info!("restoring {:?}", entry);
- progress.set_message(&format!("{}", entry.pathbuf().display()));
+ progress.set_message(format!("{}", entry.pathbuf().display()));
progress.inc(1);
let to = restored_path(entry, to)?;
match entry.kind() {
- FilesystemKind::Regular => restore_regular(client, &gen, &to, fileid, &entry)?,
+ FilesystemKind::Regular => restore_regular(client, gen, &to, fileid, entry).await?,
FilesystemKind::Directory => restore_directory(&to)?,
- FilesystemKind::Symlink => restore_symlink(&to, &entry)?,
+ FilesystemKind::Symlink => restore_symlink(&to, entry)?,
+ FilesystemKind::Socket => restore_socket(&to, entry)?,
+ FilesystemKind::Fifo => restore_fifo(&to, entry)?,
}
Ok(())
}
-fn restore_directory(path: &Path) -> anyhow::Result<()> {
+fn restore_directory(path: &Path) -> Result<(), RestoreError> {
debug!("restoring directory {}", path.display());
- std::fs::create_dir_all(path)?;
+ std::fs::create_dir_all(path)
+ .map_err(|err| RestoreError::CreateDirs(path.to_path_buf(), err))?;
Ok(())
}
-fn restore_directory_metadata(entry: &FilesystemEntry, to: &Path) -> anyhow::Result<()> {
+fn restore_directory_metadata(entry: &FilesystemEntry, to: &Path) -> Result<(), RestoreError> {
let to = restored_path(entry, to)?;
match entry.kind() {
FilesystemKind::Directory => restore_metadata(&to, entry)?,
@@ -107,7 +171,7 @@ fn restore_directory_metadata(entry: &FilesystemEntry, to: &Path) -> anyhow::Res
Ok(())
}
-fn restored_path(entry: &FilesystemEntry, to: &Path) -> anyhow::Result<PathBuf> {
+fn restored_path(entry: &FilesystemEntry, to: &Path) -> Result<PathBuf, RestoreError> {
let path = &entry.pathbuf();
let path = if path.is_absolute() {
path.strip_prefix("/")?
@@ -117,22 +181,26 @@ fn restored_path(entry: &FilesystemEntry, to: &Path) -> anyhow::Result<PathBuf>
Ok(to.join(path))
}
-fn restore_regular(
+async fn restore_regular(
client: &BackupClient,
gen: &LocalGeneration,
path: &Path,
- fileid: i64,
+ fileid: FileId,
entry: &FilesystemEntry,
-) -> anyhow::Result<()> {
+) -> Result<(), RestoreError> {
debug!("restoring regular {}", path.display());
let parent = path.parent().unwrap();
debug!(" mkdir {}", parent.display());
- std::fs::create_dir_all(parent)?;
+ std::fs::create_dir_all(parent)
+ .map_err(|err| RestoreError::CreateDirs(parent.to_path_buf(), err))?;
{
- let mut file = std::fs::File::create(path)?;
- for chunkid in gen.chunkids(fileid)? {
- let chunk = client.fetch_chunk(&chunkid)?;
- file.write_all(chunk.data())?;
+ let mut file = std::fs::File::create(path)
+ .map_err(|err| RestoreError::CreateFile(path.to_path_buf(), err))?;
+ for chunkid in gen.chunkids(fileid)?.iter()? {
+ let chunkid = chunkid?;
+ let chunk = client.fetch_chunk(&chunkid).await?;
+ file.write_all(chunk.data())
+ .map_err(|err| RestoreError::WriteFile(path.to_path_buf(), err))?;
}
restore_metadata(path, entry)?;
}
@@ -140,24 +208,44 @@ fn restore_regular(
Ok(())
}
-fn restore_symlink(path: &Path, entry: &FilesystemEntry) -> anyhow::Result<()> {
+fn restore_symlink(path: &Path, entry: &FilesystemEntry) -> Result<(), RestoreError> {
debug!("restoring symlink {}", path.display());
let parent = path.parent().unwrap();
debug!(" mkdir {}", parent.display());
if !parent.exists() {
- std::fs::create_dir_all(parent)?;
- {
- symlink(path, entry.symlink_target().unwrap())?;
+ std::fs::create_dir_all(parent)
+ .map_err(|err| RestoreError::CreateDirs(parent.to_path_buf(), err))?;
+ }
+ symlink(entry.symlink_target().unwrap(), path)
+ .map_err(|err| RestoreError::Symlink(path.to_path_buf(), err))?;
+ restore_metadata(path, entry)?;
+ debug!("restored symlink {}", path.display());
+ Ok(())
+}
+
+fn restore_socket(path: &Path, entry: &FilesystemEntry) -> Result<(), RestoreError> {
+ debug!("creating Unix domain socket {:?}", path);
+ UnixListener::bind(path).map_err(|err| RestoreError::UnixBind(path.to_path_buf(), err))?;
+ restore_metadata(path, entry)?;
+ Ok(())
+}
+
+fn restore_fifo(path: &Path, entry: &FilesystemEntry) -> Result<(), RestoreError> {
+ debug!("creating fifo {:?}", path);
+ let filename = path_to_cstring(path);
+ match unsafe { mkfifo(filename.as_ptr(), 0) } {
+ -1 => {
+ return Err(RestoreError::NamedPipeCreationError(path.to_path_buf()));
}
+ _ => restore_metadata(path, entry)?,
}
- debug!("restored regular {}", path.display());
Ok(())
}
-fn restore_metadata(path: &Path, entry: &FilesystemEntry) -> anyhow::Result<()> {
+fn restore_metadata(path: &Path, entry: &FilesystemEntry) -> Result<(), RestoreError> {
debug!("restoring metadata for {}", entry.pathbuf().display());
- let handle = File::open(path)?;
+ debug!("restoring metadata for {:?}", path);
let atime = timespec {
tv_sec: entry.atime(),
@@ -170,41 +258,58 @@ fn restore_metadata(path: &Path, entry: &FilesystemEntry) -> anyhow::Result<()>
let times = [atime, mtime];
let times: *const timespec = &times[0];
+ let pathbuf = path.to_path_buf();
+ let path = path_to_cstring(path);
+
// We have to use unsafe here to be able call the libc functions
// below.
unsafe {
- let fd = handle.as_raw_fd(); // FIXME: needs to NOT follow symlinks
-
- debug!("fchmod");
- if fchmod(fd, entry.mode()) == -1 {
- let error = Error::last_os_error();
- error!("fchmod failed on {}", path.display());
- return Err(error.into());
+ if entry.kind() != FilesystemKind::Symlink {
+ debug!("chmod {:?}", path);
+ if chmod(path.as_ptr(), entry.mode() as libc::mode_t) == -1 {
+ let error = Error::last_os_error();
+ error!("chmod failed on {:?}", path);
+ return Err(RestoreError::Chmod(pathbuf, error));
+ }
+ } else {
+ debug!(
+ "skipping chmod of a symlink because it'll attempt to change the pointed-at file"
+ );
}
- debug!("futimens");
- if futimens(fd, times) == -1 {
+ debug!("utimens {:?}", path);
+ if utimensat(AT_FDCWD, path.as_ptr(), times, AT_SYMLINK_NOFOLLOW) == -1 {
let error = Error::last_os_error();
- error!("futimens failed on {}", path.display());
- return Err(error.into());
+ error!("utimensat failed on {:?}", path);
+ return Err(RestoreError::SetTimestamp(pathbuf, error));
}
}
Ok(())
}
-fn create_progress_bar(file_count: i64, verbose: bool) -> ProgressBar {
+fn path_to_cstring(path: &Path) -> CString {
+ let path = path.as_os_str();
+ let path = path.as_bytes();
+ CString::new(path).unwrap()
+}
+
+fn create_progress_bar(file_count: FileId, verbose: bool) -> ProgressBar {
let progress = if verbose {
ProgressBar::new(file_count as u64)
} else {
ProgressBar::hidden()
};
- let parts = vec![
+ let parts = [
"{wide_bar}",
"elapsed: {elapsed}",
"files: {pos}/{len}",
"current: {wide_msg}",
"{spinner}",
];
- progress.set_style(ProgressStyle::default_bar().template(&parts.join("\n")));
+ progress.set_style(
+ ProgressStyle::default_bar()
+ .template(&parts.join("\n"))
+ .expect("create indicatif ProgressStyle value"),
+ );
progress
}
diff --git a/src/cmd/show_config.rs b/src/cmd/show_config.rs
new file mode 100644
index 0000000..8e0ce30
--- /dev/null
+++ b/src/cmd/show_config.rs
@@ -0,0 +1,17 @@
+//! The `show-config` subcommand.
+
+use crate::config::ClientConfig;
+use crate::error::ObnamError;
+use clap::Parser;
+
+/// Show actual client configuration.
+#[derive(Debug, Parser)]
+pub struct ShowConfig {}
+
+impl ShowConfig {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ println!("{}", serde_json::to_string_pretty(config)?);
+ Ok(())
+ }
+}
diff --git a/src/cmd/show_gen.rs b/src/cmd/show_gen.rs
index d355389..95d3fd3 100644
--- a/src/cmd/show_gen.rs
+++ b/src/cmd/show_gen.rs
@@ -1,46 +1,101 @@
+//! The `show-generation` subcommand.
+
+use crate::chunk::ClientTrust;
use crate::client::BackupClient;
-use crate::client::ClientConfig;
+use crate::config::ClientConfig;
+use crate::db::DbInt;
use crate::error::ObnamError;
use crate::fsentry::FilesystemKind;
+use crate::generation::GenId;
+use clap::Parser;
use indicatif::HumanBytes;
+use serde::Serialize;
use tempfile::NamedTempFile;
+use tokio::runtime::Runtime;
+
+/// Show information about a generation.
+#[derive(Debug, Parser)]
+pub struct ShowGeneration {
+ /// Reference to the generation. Defaults to latest.
+ #[clap(default_value = "latest")]
+ gen_id: String,
+}
-pub fn show_generation(config: &ClientConfig, gen_ref: &str) -> anyhow::Result<()> {
- // Create a named temporary file. We don't meed the open file
- // handle, so we discard that.
- let dbname = {
+impl ShowGeneration {
+ /// Run the command.
+ pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
+ let rt = Runtime::new()?;
+ rt.block_on(self.run_async(config))
+ }
+
+ async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
let temp = NamedTempFile::new()?;
- let (_, dbname) = temp.keep()?;
- dbname
- };
-
- let client = BackupClient::new(&config.server_url)?;
-
- let genlist = client.list_generations()?;
- let gen_id: String = match genlist.resolve(gen_ref) {
- None => return Err(ObnamError::UnknownGeneration(gen_ref.to_string()).into()),
- Some(id) => id,
- };
-
- let gen = client.fetch_generation(&gen_id, &dbname)?;
- let files = gen.files()?;
-
- let total_bytes = files.iter().fold(0, |acc, file| {
- let e = file.entry();
- if e.kind() == FilesystemKind::Regular {
- acc + file.entry().len()
- } else {
- acc
+ let client = BackupClient::new(config)?;
+ let trust = client
+ .get_client_trust()
+ .await?
+ .or_else(|| Some(ClientTrust::new("FIXME", None, "".to_string(), vec![])))
+ .unwrap();
+
+ let genlist = client.list_generations(&trust);
+ let gen_id = genlist.resolve(&self.gen_id)?;
+ let gen = client.fetch_generation(&gen_id, temp.path()).await?;
+ let mut files = gen.files()?;
+ let mut files = files.iter()?;
+
+ let total_bytes = files.try_fold(0, |acc, file| {
+ file.map(|(_, e, _, _)| {
+ if e.kind() == FilesystemKind::Regular {
+ acc + e.len()
+ } else {
+ acc
+ }
+ })
+ });
+ let total_bytes = total_bytes?;
+
+ let output = Output::new(gen_id)
+ .db_bytes(temp.path().metadata()?.len())
+ .file_count(gen.file_count()?)
+ .file_bytes(total_bytes);
+ serde_json::to_writer_pretty(std::io::stdout(), &output)?;
+
+ Ok(())
+ }
+}
+
+#[derive(Debug, Default, Serialize)]
+struct Output {
+ generation_id: String,
+ file_count: DbInt,
+ file_bytes: String,
+ file_bytes_raw: u64,
+ db_bytes: String,
+ db_bytes_raw: u64,
+}
+
+impl Output {
+ fn new(gen_id: GenId) -> Self {
+ Self {
+ generation_id: format!("{}", gen_id),
+ ..Self::default()
}
- });
+ }
- println!("generation-id: {}", gen_id);
- println!("file-count: {}", gen.file_count()?);
- println!("file-bytes: {}", HumanBytes(total_bytes));
- println!("file-bytes-raw: {}", total_bytes);
+ fn file_count(mut self, n: DbInt) -> Self {
+ self.file_count = n;
+ self
+ }
- // Delete the temporary file.
- std::fs::remove_file(&dbname)?;
+ fn file_bytes(mut self, n: u64) -> Self {
+ self.file_bytes_raw = n;
+ self.file_bytes = HumanBytes(n).to_string();
+ self
+ }
- Ok(())
+ fn db_bytes(mut self, n: u64) -> Self {
+ self.db_bytes_raw = n;
+ self.db_bytes = HumanBytes(n).to_string();
+ self
+ }
}
diff --git a/src/config.rs b/src/config.rs
new file mode 100644
index 0000000..5774aad
--- /dev/null
+++ b/src/config.rs
@@ -0,0 +1,142 @@
+//! Client configuration.
+
+use crate::passwords::{passwords_filename, PasswordError, Passwords};
+
+use bytesize::MIB;
+use log::{error, trace};
+use serde::{Deserialize, Serialize};
+use std::path::{Path, PathBuf};
+
+const DEFAULT_CHUNK_SIZE: usize = MIB as usize;
+const DEVNULL: &str = "/dev/null";
+
+#[derive(Debug, Deserialize, Clone)]
+#[serde(deny_unknown_fields)]
+struct TentativeClientConfig {
+ server_url: String,
+ verify_tls_cert: Option<bool>,
+ chunk_size: Option<usize>,
+ roots: Vec<PathBuf>,
+ log: Option<PathBuf>,
+ exclude_cache_tag_directories: Option<bool>,
+}
+
+/// Configuration for the Obnam client.
+#[derive(Debug, Serialize, Clone)]
+pub struct ClientConfig {
+ /// Name of configuration file.
+ pub filename: PathBuf,
+ /// URL of Obnam server.
+ pub server_url: String,
+ /// Should server's TLS certificate be verified using CA
+ /// signatures? Set to false, for self-signed certificates.
+ pub verify_tls_cert: bool,
+ /// Size of chunks when splitting files for backup.
+ pub chunk_size: usize,
+ /// Backup root directories.
+ pub roots: Vec<PathBuf>,
+ /// File where logs should be written.
+ pub log: PathBuf,
+ /// Should cache directories be excluded? Cache directories
+ /// contain a specially formatted CACHEDIR.TAG file.
+ pub exclude_cache_tag_directories: bool,
+}
+
+impl ClientConfig {
+ /// Read a client configuration from a file.
+ pub fn read(filename: &Path) -> Result<Self, ClientConfigError> {
+ trace!("read_config: filename={:?}", filename);
+ let config = std::fs::read_to_string(filename)
+ .map_err(|err| ClientConfigError::Read(filename.to_path_buf(), err))?;
+ let tentative: TentativeClientConfig = serde_yaml::from_str(&config)
+ .map_err(|err| ClientConfigError::YamlParse(filename.to_path_buf(), err))?;
+ let roots = tentative
+ .roots
+ .iter()
+ .map(|path| expand_tilde(path))
+ .collect();
+ let log = tentative
+ .log
+ .map(|path| expand_tilde(&path))
+ .unwrap_or_else(|| PathBuf::from(DEVNULL));
+ let exclude_cache_tag_directories = tentative.exclude_cache_tag_directories.unwrap_or(true);
+
+ let config = Self {
+ chunk_size: tentative.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
+ filename: filename.to_path_buf(),
+ roots,
+ server_url: tentative.server_url,
+ verify_tls_cert: tentative.verify_tls_cert.unwrap_or(false),
+ log,
+ exclude_cache_tag_directories,
+ };
+
+ config.check()?;
+ Ok(config)
+ }
+
+ fn check(&self) -> Result<(), ClientConfigError> {
+ if self.server_url.is_empty() {
+ return Err(ClientConfigError::ServerUrlIsEmpty);
+ }
+ if !self.server_url.starts_with("https://") {
+ return Err(ClientConfigError::NotHttps(self.server_url.to_string()));
+ }
+ if self.roots.is_empty() {
+ return Err(ClientConfigError::NoBackupRoot);
+ }
+ Ok(())
+ }
+
+ /// Read encryption passwords from a file.
+ ///
+ /// The password file is expected to be next to the configuration file.
+ pub fn passwords(&self) -> Result<Passwords, ClientConfigError> {
+ Passwords::load(&passwords_filename(&self.filename))
+ .map_err(ClientConfigError::PasswordsMissing)
+ }
+}
+
+/// Possible errors from configuration files.
+#[derive(Debug, thiserror::Error)]
+pub enum ClientConfigError {
+ /// The configuration specifies the server URL as an empty string.
+ #[error("server_url is empty")]
+ ServerUrlIsEmpty,
+
+ /// The configuration does not specify any backup root directories.
+ #[error("No backup roots in config; at least one is needed")]
+ NoBackupRoot,
+
+ /// The server URL is not an https: one.
+ #[error("server URL doesn't use https: {0}")]
+ NotHttps(String),
+
+ /// There are no passwords stored.
+ #[error("No passwords are set: you may need to run 'obnam init': {0}")]
+ PasswordsMissing(PasswordError),
+
+ /// Error reading a configuation file.
+ #[error("failed to read configuration file {0}: {1}")]
+ Read(PathBuf, std::io::Error),
+
+ /// Error parsing configuration file as YAML.
+ #[error("failed to parse configuration file {0} as YAML: {1}")]
+ YamlParse(PathBuf, serde_yaml::Error),
+}
+
+fn expand_tilde(path: &Path) -> PathBuf {
+ if path.starts_with("~/") {
+ if let Some(home) = std::env::var_os("HOME") {
+ let mut expanded = PathBuf::from(home);
+ for comp in path.components().skip(1) {
+ expanded.push(comp);
+ }
+ expanded
+ } else {
+ path.to_path_buf()
+ }
+ } else {
+ path.to_path_buf()
+ }
+}
diff --git a/src/db.rs b/src/db.rs
new file mode 100644
index 0000000..392134d
--- /dev/null
+++ b/src/db.rs
@@ -0,0 +1,640 @@
+//! A database abstraction around SQLite for Obnam.
+//!
+//! This abstraction provided the bare minimum that Obnam needs, while
+//! trying to be as performant as possible, especially for inserting
+//! rows. Only data types needed by Obnam are supported.
+//!
+//! Note that this abstraction is entirely synchronous. This is for
+//! simplicity, as SQLite only allows one write at a time.
+
+use crate::fsentry::FilesystemEntry;
+use rusqlite::{params, types::ToSqlOutput, CachedStatement, Connection, OpenFlags, Row, ToSql};
+use std::collections::HashSet;
+use std::convert::TryFrom;
+use std::path::{Path, PathBuf};
+
+/// A database.
+pub struct Database {
+ conn: Connection,
+}
+
+impl Database {
+ /// Create a new database file for an empty database.
+ ///
+ /// The database can be written to.
+ pub fn create<P: AsRef<Path>>(filename: P) -> Result<Self, DatabaseError> {
+ if filename.as_ref().exists() {
+ return Err(DatabaseError::Exists(filename.as_ref().to_path_buf()));
+ }
+ let flags = OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE;
+ let conn = Connection::open_with_flags(filename, flags)?;
+ conn.execute("BEGIN", params![])?;
+ Ok(Self { conn })
+ }
+
+ /// Open an existing database file in read only mode.
+ pub fn open<P: AsRef<Path>>(filename: P) -> Result<Self, DatabaseError> {
+ let flags = OpenFlags::SQLITE_OPEN_READ_ONLY;
+ let conn = Connection::open_with_flags(filename, flags)?;
+ Ok(Self { conn })
+ }
+
+ /// Close an open database, committing any changes to disk.
+ pub fn close(self) -> Result<(), DatabaseError> {
+ self.conn.execute("COMMIT", params![])?;
+ self.conn
+ .close()
+ .map_err(|(_, err)| DatabaseError::Rusqlite(err))?;
+ Ok(())
+ }
+
+ /// Create a table in the database.
+ pub fn create_table(&self, table: &Table) -> Result<(), DatabaseError> {
+ let sql = sql_statement::create_table(table);
+ self.conn.execute(&sql, params![])?;
+ Ok(())
+ }
+
+ /// Create an index in the database.
+ pub fn create_index(
+ &self,
+ name: &str,
+ table: &Table,
+ column: &str,
+ ) -> Result<(), DatabaseError> {
+ let sql = sql_statement::create_index(name, table, column);
+ self.conn.execute(&sql, params![])?;
+ Ok(())
+ }
+
+ /// Insert a row in a table.
+ pub fn insert(&mut self, table: &Table, values: &[Value]) -> Result<(), DatabaseError> {
+ let mut stmt = self.conn.prepare_cached(table.insert())?;
+ assert!(table.has_columns(values));
+ // The ToSql trait implementation for Obnam values can't ever
+ // fail, so we don't handle the error case in the parameter
+ // creation below.
+ stmt.execute(rusqlite::params_from_iter(values.iter().map(|v| {
+ v.to_sql()
+ .expect("conversion of Obnam value to SQLite value failed unexpectedly")
+ })))?;
+ Ok(())
+ }
+
+ /// Return an iterator for all rows in a table.
+ pub fn all_rows<T>(
+ &self,
+ table: &Table,
+ rowfunc: &'static dyn Fn(&Row) -> Result<T, rusqlite::Error>,
+ ) -> Result<SqlResults<T>, DatabaseError> {
+ let sql = sql_statement::select_all_rows(table);
+ SqlResults::new(
+ &self.conn,
+ &sql,
+ None,
+ Box::new(|stmt, _| {
+ let iter = stmt.query_map(params![], |row| rowfunc(row))?;
+ let iter = iter.map(|x| match x {
+ Ok(t) => Ok(t),
+ Err(e) => Err(DatabaseError::Rusqlite(e)),
+ });
+ Ok(Box::new(iter))
+ }),
+ )
+ }
+
+ /// Return rows that have a given value in a given column.
+ ///
+ /// This is simplistic, but for Obnam, it provides all the SQL
+ /// SELECT ... WHERE that's needed, and there's no point in making
+ /// this more generic than is needed.
+ pub fn some_rows<T>(
+ &self,
+ table: &Table,
+ value: &Value,
+ rowfunc: &'static dyn Fn(&Row) -> Result<T, rusqlite::Error>,
+ ) -> Result<SqlResults<T>, DatabaseError> {
+ assert!(table.has_column(value));
+ let sql = sql_statement::select_some_rows(table, value.name());
+ SqlResults::new(
+ &self.conn,
+ &sql,
+ Some(OwnedValue::from(value)),
+ Box::new(|stmt, value| {
+ let iter = stmt.query_map(params![value], |row| rowfunc(row))?;
+ let iter = iter.map(|x| match x {
+ Ok(t) => Ok(t),
+ Err(e) => Err(DatabaseError::Rusqlite(e)),
+ });
+ Ok(Box::new(iter))
+ }),
+ )
+ }
+}
+
+/// Possible errors from a database.
+#[derive(Debug, thiserror::Error)]
+pub enum DatabaseError {
+ /// An error from the rusqlite crate.
+ #[error(transparent)]
+ Rusqlite(#[from] rusqlite::Error),
+
+ /// The database being created already exists.
+ #[error("Database {0} already exists")]
+ Exists(PathBuf),
+}
+
+// A pointer to a "fallible iterator" over values of type `T`, which is to say it's an iterator
+// over values of type `Result<T, DatabaseError>`. The iterator is only valid for the
+// lifetime 'stmt.
+//
+// The fact that it's a pointer (`Box<dyn ...>`) means we don't care what the actual type of
+// the iterator is, and who produces it.
+type SqlResultsIterator<'stmt, T> = Box<dyn Iterator<Item = Result<T, DatabaseError>> + 'stmt>;
+
+// A pointer to a function which, when called on a prepared SQLite statement, would create
+// a "fallible iterator" over values of type `ItemT`. (See above for an explanation of what
+// a "fallible iterator" is.)
+//
+// The iterator is only valid for the lifetime of the associated SQLite statement; we
+// call this lifetime 'stmt, and use it both both on the reference and the returned iterator.
+//
+// Now we're in a pickle: all named lifetimes have to be declared _somewhere_, but we can't add
+// 'stmt to the signature of `CreateIterFn` because then we'll have to specify it when we
+// define the function. Obviously, at that point we won't yet have a `Statement`, and thus we
+// would have no idea what its lifetime is going to be. So we can't put the 'stmt lifetime into
+// the signature of `CreateIterFn`.
+//
+// That's what `for<'stmt>` is for. This is a so-called ["higher-rank trait bound"][hrtb], and
+// it enables us to say that a function is valid for *some* lifetime 'stmt that we pass into it
+// at the call site. It lets Rust continue to track lifetimes even though `CreateIterFn`
+// interferes by "hiding" the 'stmt lifetime from its signature.
+//
+// [hrtb]: https://doc.rust-lang.org/nomicon/hrtb.html
+type CreateIterFn<'conn, ItemT> = Box<
+ dyn for<'stmt> Fn(
+ &'stmt mut CachedStatement<'conn>,
+ &Option<OwnedValue>,
+ ) -> Result<SqlResultsIterator<'stmt, ItemT>, DatabaseError>,
+>;
+
+/// An iterator over rows from a query.
+pub struct SqlResults<'conn, ItemT> {
+ stmt: CachedStatement<'conn>,
+ value: Option<OwnedValue>,
+ create_iter: CreateIterFn<'conn, ItemT>,
+}
+
+impl<'conn, ItemT> SqlResults<'conn, ItemT> {
+ fn new(
+ conn: &'conn Connection,
+ statement: &str,
+ value: Option<OwnedValue>,
+ create_iter: CreateIterFn<'conn, ItemT>,
+ ) -> Result<Self, DatabaseError> {
+ let stmt = conn.prepare_cached(statement)?;
+ Ok(Self {
+ stmt,
+ value,
+ create_iter,
+ })
+ }
+
+ /// Create an iterator over results.
+ pub fn iter(&'_ mut self) -> Result<SqlResultsIterator<'_, ItemT>, DatabaseError> {
+ (self.create_iter)(&mut self.stmt, &self.value)
+ }
+}
+
+/// Describe a table in a row.
+pub struct Table {
+ table: String,
+ columns: Vec<Column>,
+ insert: Option<String>,
+ column_names: HashSet<String>,
+}
+
+impl Table {
+ /// Create a new table description without columns.
+ ///
+ /// The table description is not "built". You must add columns and
+ /// then call the [`build`] method.
+ pub fn new(table: &str) -> Self {
+ Self {
+ table: table.to_string(),
+ columns: vec![],
+ insert: None,
+ column_names: HashSet::new(),
+ }
+ }
+
+ /// Append a column.
+ pub fn column(mut self, column: Column) -> Self {
+ self.column_names.insert(column.name().to_string());
+ self.columns.push(column);
+ self
+ }
+
+ /// Finish building the table description.
+ pub fn build(mut self) -> Self {
+ assert!(self.insert.is_none());
+ self.insert = Some(sql_statement::insert(&self));
+ self
+ }
+
+ fn has_columns(&self, values: &[Value]) -> bool {
+ assert!(self.insert.is_some());
+ for v in values.iter() {
+ if !self.column_names.contains(v.name()) {
+ return false;
+ }
+ }
+ true
+ }
+
+ fn has_column(&self, value: &Value) -> bool {
+ assert!(self.insert.is_some());
+ self.column_names.contains(value.name())
+ }
+
+ fn insert(&self) -> &str {
+ assert!(self.insert.is_some());
+ self.insert.as_ref().unwrap()
+ }
+
+ /// What is the name of the table?
+ pub fn name(&self) -> &str {
+ &self.table
+ }
+
+ /// How many columns does the table have?
+ pub fn num_columns(&self) -> usize {
+ self.columns.len()
+ }
+
+ /// What are the names of the columns in the table?
+ pub fn column_names(&self) -> impl Iterator<Item = &str> {
+ self.columns.iter().map(|c| c.name())
+ }
+
+ /// Return SQL column definitions for the table.
+ pub fn column_definitions(&self) -> String {
+ let mut ret = String::new();
+ for c in self.columns.iter() {
+ if !ret.is_empty() {
+ ret.push(',');
+ }
+ ret.push_str(c.name());
+ ret.push(' ');
+ ret.push_str(c.typename());
+ }
+ ret
+ }
+}
+
+/// A column in a table description.
+pub enum Column {
+ /// An integer primary key.
+ PrimaryKey(String),
+ /// An integer.
+ Int(String),
+ /// A text string.
+ Text(String),
+ /// A binary string.
+ Blob(String),
+ /// A boolean.
+ Bool(String),
+}
+
+impl Column {
+ fn name(&self) -> &str {
+ match self {
+ Self::PrimaryKey(name) => name,
+ Self::Int(name) => name,
+ Self::Text(name) => name,
+ Self::Blob(name) => name,
+ Self::Bool(name) => name,
+ }
+ }
+
+ fn typename(&self) -> &str {
+ match self {
+ Self::PrimaryKey(_) => "INTEGER PRIMARY KEY",
+ Self::Int(_) => "INTEGER",
+ Self::Text(_) => "TEXT",
+ Self::Blob(_) => "BLOB",
+ Self::Bool(_) => "BOOLEAN",
+ }
+ }
+
+ /// Create an integer primary key column.
+ pub fn primary_key(name: &str) -> Self {
+ Self::PrimaryKey(name.to_string())
+ }
+
+ /// Create an integer column.
+ pub fn int(name: &str) -> Self {
+ Self::Int(name.to_string())
+ }
+
+ /// Create a text string column.
+ pub fn text(name: &str) -> Self {
+ Self::Text(name.to_string())
+ }
+
+ /// Create a binary string column.
+ pub fn blob(name: &str) -> Self {
+ Self::Blob(name.to_string())
+ }
+
+ /// Create a boolean column.
+ pub fn bool(name: &str) -> Self {
+ Self::Bool(name.to_string())
+ }
+}
+
+/// Type of plain integers that can be stored.
+pub type DbInt = i64;
+
+/// A value in a named column.
+#[derive(Debug)]
+pub enum Value<'a> {
+ /// An integer primary key.
+ PrimaryKey(&'a str, DbInt),
+ /// An integer.
+ Int(&'a str, DbInt),
+ /// A text string.
+ Text(&'a str, &'a str),
+ /// A binary string.
+ Blob(&'a str, &'a [u8]),
+ /// A boolean.
+ Bool(&'a str, bool),
+}
+
+impl<'a> Value<'a> {
+ /// What column should store this value?
+ pub fn name(&self) -> &str {
+ match self {
+ Self::PrimaryKey(name, _) => name,
+ Self::Int(name, _) => name,
+ Self::Text(name, _) => name,
+ Self::Blob(name, _) => name,
+ Self::Bool(name, _) => name,
+ }
+ }
+
+ /// Create an integer primary key value.
+ pub fn primary_key(name: &'a str, value: DbInt) -> Self {
+ Self::PrimaryKey(name, value)
+ }
+
+ /// Create an integer value.
+ pub fn int(name: &'a str, value: DbInt) -> Self {
+ Self::Int(name, value)
+ }
+
+ /// Create a text string value.
+ pub fn text(name: &'a str, value: &'a str) -> Self {
+ Self::Text(name, value)
+ }
+
+ /// Create a binary string value.
+ pub fn blob(name: &'a str, value: &'a [u8]) -> Self {
+ Self::Blob(name, value)
+ }
+
+ /// Create a boolean value.
+ pub fn bool(name: &'a str, value: bool) -> Self {
+ Self::Bool(name, value)
+ }
+}
+
+#[allow(clippy::useless_conversion)]
+impl<'a> ToSql for Value<'a> {
+ // The trait defines to_sql to return a Result. However, for our
+ // particular case, to_sql can't ever fail. We only store values
+ // in types for which conversion always succeeds: integer,
+ // boolean, text, and blob. _For us_, the caller need never worry
+ // that the conversion fails, but we can't express that in the
+ // type system.
+ fn to_sql(&self) -> Result<rusqlite::types::ToSqlOutput, rusqlite::Error> {
+ use rusqlite::types::ValueRef;
+ let v = match self {
+ Self::PrimaryKey(_, v) => ValueRef::Integer(
+ i64::try_from(*v)
+ .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?,
+ ),
+ Self::Int(_, v) => ValueRef::Integer(
+ i64::try_from(*v)
+ .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?,
+ ),
+ Self::Bool(_, v) => ValueRef::Integer(i64::from(*v)),
+ Self::Text(_, v) => ValueRef::Text(v.as_ref()),
+ Self::Blob(_, v) => ValueRef::Blob(v),
+ };
+ Ok(ToSqlOutput::Borrowed(v))
+ }
+}
+
+/// Like a Value, but owns the data.
+pub enum OwnedValue {
+ /// An integer primary key.
+ PrimaryKey(String, DbInt),
+ /// An integer.
+ Int(String, DbInt),
+ /// A text string.
+ Text(String, String),
+ /// A binary string.
+ Blob(String, Vec<u8>),
+ /// A boolean.
+ Bool(String, bool),
+}
+
+impl From<&Value<'_>> for OwnedValue {
+ fn from(v: &Value) -> Self {
+ match *v {
+ Value::PrimaryKey(name, v) => Self::PrimaryKey(name.to_string(), v),
+ Value::Int(name, v) => Self::Int(name.to_string(), v),
+ Value::Text(name, v) => Self::Text(name.to_string(), v.to_string()),
+ Value::Blob(name, v) => Self::Blob(name.to_string(), v.to_vec()),
+ Value::Bool(name, v) => Self::Bool(name.to_string(), v),
+ }
+ }
+}
+
+impl ToSql for OwnedValue {
+ #[allow(clippy::useless_conversion)]
+ fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput> {
+ use rusqlite::types::Value;
+ let v = match self {
+ Self::PrimaryKey(_, v) => Value::Integer(
+ i64::try_from(*v)
+ .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?,
+ ),
+ Self::Int(_, v) => Value::Integer(
+ i64::try_from(*v)
+ .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?,
+ ),
+ Self::Bool(_, v) => Value::Integer(i64::from(*v)),
+ Self::Text(_, v) => Value::Text(v.to_string()),
+ Self::Blob(_, v) => Value::Blob(v.to_vec()),
+ };
+ Ok(ToSqlOutput::Owned(v))
+ }
+}
+
+impl rusqlite::types::ToSql for FilesystemEntry {
+ fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
+ let json = serde_json::to_string(self)
+ .map_err(|err| rusqlite::Error::ToSqlConversionFailure(Box::new(err)))?;
+ let json = rusqlite::types::Value::Text(json);
+ Ok(ToSqlOutput::Owned(json))
+ }
+}
+
+mod sql_statement {
+ use super::Table;
+
+ pub fn create_table(table: &Table) -> String {
+ format!(
+ "CREATE TABLE {} ({})",
+ table.name(),
+ table.column_definitions()
+ )
+ }
+
+ pub fn create_index(name: &str, table: &Table, column: &str) -> String {
+ format!("CREATE INDEX {} ON {} ({})", name, table.name(), column,)
+ }
+
+ pub fn insert(table: &Table) -> String {
+ format!(
+ "INSERT INTO {} ({}) VALUES ({})",
+ table.name(),
+ &column_names(table),
+ placeholders(table.column_names().count())
+ )
+ }
+
+ pub fn select_all_rows(table: &Table) -> String {
+ format!("SELECT * FROM {}", table.name())
+ }
+
+ pub fn select_some_rows(table: &Table, column: &str) -> String {
+ format!("SELECT * FROM {} WHERE {} = ?", table.name(), column)
+ }
+
+ fn column_names(table: &Table) -> String {
+ table.column_names().collect::<Vec<&str>>().join(",")
+ }
+
+ fn placeholders(num_columns: usize) -> String {
+ let mut s = String::new();
+ for _ in 0..num_columns {
+ if !s.is_empty() {
+ s.push(',');
+ }
+ s.push('?');
+ }
+ s
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use std::path::Path;
+ use tempfile::tempdir;
+
+ fn get_bar(row: &rusqlite::Row) -> Result<DbInt, rusqlite::Error> {
+ row.get("bar")
+ }
+
+ fn table() -> Table {
+ Table::new("foo").column(Column::int("bar")).build()
+ }
+
+ fn create_db(file: &Path) -> Database {
+ let table = table();
+ let db = Database::create(file).unwrap();
+ db.create_table(&table).unwrap();
+ db
+ }
+
+ fn open_db(file: &Path) -> Database {
+ Database::open(file).unwrap()
+ }
+
+ fn insert(db: &mut Database, value: DbInt) {
+ let table = table();
+ db.insert(&table, &[Value::int("bar", value)]).unwrap();
+ }
+
+ fn values(db: Database) -> Vec<DbInt> {
+ let table = table();
+ let mut rows = db.all_rows(&table, &get_bar).unwrap();
+ let iter = rows.iter().unwrap();
+ let mut values = vec![];
+ for x in iter {
+ values.push(x.unwrap());
+ }
+ values
+ }
+
+ #[test]
+ fn creates_db() {
+ let tmp = tempdir().unwrap();
+ let filename = tmp.path().join("test.db");
+ let db = Database::create(&filename).unwrap();
+ db.close().unwrap();
+ let _ = Database::open(&filename).unwrap();
+ }
+
+ #[test]
+ fn inserts_row() {
+ let tmp = tempdir().unwrap();
+ let filename = tmp.path().join("test.db");
+ let mut db = create_db(&filename);
+ insert(&mut db, 42);
+ db.close().unwrap();
+
+ let db = open_db(&filename);
+ let values = values(db);
+ assert_eq!(values, vec![42]);
+ }
+
+ #[test]
+ fn inserts_many_rows() {
+ const N: DbInt = 1000;
+
+ let tmp = tempdir().unwrap();
+ let filename = tmp.path().join("test.db");
+ let mut db = create_db(&filename);
+ for i in 0..N {
+ insert(&mut db, i);
+ }
+ db.close().unwrap();
+
+ let db = open_db(&filename);
+ let values = values(db);
+ assert_eq!(values.len() as DbInt, N);
+
+ let mut expected = vec![];
+ for i in 0..N {
+ expected.push(i);
+ }
+ assert_eq!(values, expected);
+ }
+ #[test]
+ fn round_trips_int_max() {
+ let tmp = tempdir().unwrap();
+ let filename = tmp.path().join("test.db");
+ let mut db = create_db(&filename);
+ insert(&mut db, DbInt::MAX);
+ db.close().unwrap();
+
+ let db = open_db(&filename);
+ let values = values(db);
+ assert_eq!(values, vec![DbInt::MAX]);
+ }
+}
diff --git a/src/dbgen.rs b/src/dbgen.rs
new file mode 100644
index 0000000..0053d4a
--- /dev/null
+++ b/src/dbgen.rs
@@ -0,0 +1,768 @@
+//! Database abstraction for generations.
+
+use crate::backup_reason::Reason;
+use crate::chunkid::ChunkId;
+use crate::db::{Column, Database, DatabaseError, DbInt, SqlResults, Table, Value};
+use crate::fsentry::FilesystemEntry;
+use crate::genmeta::{GenerationMeta, GenerationMetaError};
+use crate::label::LabelChecksumKind;
+use crate::schema::{SchemaVersion, VersionComponent};
+use log::error;
+use std::collections::HashMap;
+use std::os::unix::ffi::OsStrExt;
+use std::path::{Path, PathBuf};
+
+/// Return latest supported schema version for a supported major
+/// version.
+pub fn schema_version(major: VersionComponent) -> Result<SchemaVersion, GenerationDbError> {
+ match major {
+ 0 => Ok(SchemaVersion::new(0, 0)),
+ 1 => Ok(SchemaVersion::new(1, 0)),
+ _ => Err(GenerationDbError::Unsupported(major)),
+ }
+}
+
+/// Default database schema major version.a
+pub const DEFAULT_SCHEMA_MAJOR: VersionComponent = V0_0::MAJOR;
+
+/// Major schema versions supported by this version of Obnam.
+pub const SCHEMA_MAJORS: &[VersionComponent] = &[0, 1];
+
+/// An integer identifier for a file in a generation.
+pub type FileId = DbInt;
+
+/// Possible errors from using generation databases.
+#[derive(Debug, thiserror::Error)]
+pub enum GenerationDbError {
+ /// Duplicate file names.
+ #[error("Generation has more than one file with the name {0}")]
+ TooManyFiles(PathBuf),
+
+ /// No 'meta' table in generation.
+ #[error("Generation does not have a 'meta' table")]
+ NoMeta,
+
+ /// Missing from from 'meta' table.
+ #[error("Generation 'meta' table does not have a row {0}")]
+ NoMetaKey(String),
+
+ /// Bad data in 'meta' table.
+ #[error("Generation 'meta' row {0} has badly formed integer: {1}")]
+ BadMetaInteger(String, std::num::ParseIntError),
+
+ /// A major schema version is unsupported.
+ #[error("Unsupported backup schema major version: {0}")]
+ Unsupported(VersionComponent),
+
+ /// Local generation uses a schema version that this version of
+ /// Obnam isn't compatible with.
+ #[error("Backup is not compatible with this version of Obnam: {0}.{1}")]
+ Incompatible(VersionComponent, VersionComponent),
+
+ /// Error from a database
+ #[error(transparent)]
+ Database(#[from] DatabaseError),
+
+ /// Error from generation metadata.
+ #[error(transparent)]
+ GenerationMeta(#[from] GenerationMetaError),
+
+ /// Error from JSON.
+ #[error(transparent)]
+ SerdeJsonError(#[from] serde_json::Error),
+
+ /// Error from I/O.
+ #[error(transparent)]
+ IoError(#[from] std::io::Error),
+}
+
+/// A database representing a backup generation.
+pub struct GenerationDb {
+ variant: GenerationDbVariant,
+}
+
+enum GenerationDbVariant {
+ V0_0(V0_0),
+ V1_0(V1_0),
+}
+
+impl GenerationDb {
+ /// Create a new generation database in read/write mode.
+ pub fn create<P: AsRef<Path>>(
+ filename: P,
+ schema: SchemaVersion,
+ checksum_kind: LabelChecksumKind,
+ ) -> Result<Self, GenerationDbError> {
+ let meta_table = Self::meta_table();
+ let variant = match schema.version() {
+ (V0_0::MAJOR, V0_0::MINOR) => {
+ GenerationDbVariant::V0_0(V0_0::create(filename, meta_table, checksum_kind)?)
+ }
+ (V1_0::MAJOR, V1_0::MINOR) => {
+ GenerationDbVariant::V1_0(V1_0::create(filename, meta_table, checksum_kind)?)
+ }
+ (major, minor) => return Err(GenerationDbError::Incompatible(major, minor)),
+ };
+ Ok(Self { variant })
+ }
+
+ /// Open an existing generation database in read-only mode.
+ pub fn open<P: AsRef<Path>>(filename: P) -> Result<Self, GenerationDbError> {
+ let filename = filename.as_ref();
+ let meta_table = Self::meta_table();
+ let schema = {
+ let plain_db = Database::open(filename)?;
+ let rows = Self::meta_rows(&plain_db, &meta_table)?;
+ GenerationMeta::from(rows)?.schema_version()
+ };
+ let variant = match schema.version() {
+ (V0_0::MAJOR, V0_0::MINOR) => {
+ GenerationDbVariant::V0_0(V0_0::open(filename, meta_table)?)
+ }
+ (V1_0::MAJOR, V1_0::MINOR) => {
+ GenerationDbVariant::V1_0(V1_0::open(filename, meta_table)?)
+ }
+ (major, minor) => return Err(GenerationDbError::Incompatible(major, minor)),
+ };
+ Ok(Self { variant })
+ }
+
+ fn meta_table() -> Table {
+ Table::new("meta")
+ .column(Column::text("key"))
+ .column(Column::text("value"))
+ .build()
+ }
+
+ fn meta_rows(
+ db: &Database,
+ table: &Table,
+ ) -> Result<HashMap<String, String>, GenerationDbError> {
+ let mut map = HashMap::new();
+ let mut iter = db.all_rows(table, &row_to_kv)?;
+ for kv in iter.iter()? {
+ let (key, value) = kv?;
+ map.insert(key, value);
+ }
+ Ok(map)
+ }
+
+ /// Close a database, commit any changes.
+ pub fn close(self) -> Result<(), GenerationDbError> {
+ match self.variant {
+ GenerationDbVariant::V0_0(v) => v.close(),
+ GenerationDbVariant::V1_0(v) => v.close(),
+ }
+ }
+
+ /// Return contents of "meta" table as a HashMap.
+ pub fn meta(&self) -> Result<HashMap<String, String>, GenerationDbError> {
+ match &self.variant {
+ GenerationDbVariant::V0_0(v) => v.meta(),
+ GenerationDbVariant::V1_0(v) => v.meta(),
+ }
+ }
+
+ /// Insert a file system entry into the database.
+ pub fn insert(
+ &mut self,
+ e: FilesystemEntry,
+ fileid: FileId,
+ ids: &[ChunkId],
+ reason: Reason,
+ is_cachedir_tag: bool,
+ ) -> Result<(), GenerationDbError> {
+ match &mut self.variant {
+ GenerationDbVariant::V0_0(v) => v.insert(e, fileid, ids, reason, is_cachedir_tag),
+ GenerationDbVariant::V1_0(v) => v.insert(e, fileid, ids, reason, is_cachedir_tag),
+ }
+ }
+
+ /// Count number of file system entries.
+ pub fn file_count(&self) -> Result<FileId, GenerationDbError> {
+ match &self.variant {
+ GenerationDbVariant::V0_0(v) => v.file_count(),
+ GenerationDbVariant::V1_0(v) => v.file_count(),
+ }
+ }
+
+ /// Does a path refer to a cache directory?
+ pub fn is_cachedir_tag(&self, filename: &Path) -> Result<bool, GenerationDbError> {
+ match &self.variant {
+ GenerationDbVariant::V0_0(v) => v.is_cachedir_tag(filename),
+ GenerationDbVariant::V1_0(v) => v.is_cachedir_tag(filename),
+ }
+ }
+
+ /// Return all chunk ids in database.
+ pub fn chunkids(&self, fileid: FileId) -> Result<SqlResults<ChunkId>, GenerationDbError> {
+ match &self.variant {
+ GenerationDbVariant::V0_0(v) => v.chunkids(fileid),
+ GenerationDbVariant::V1_0(v) => v.chunkids(fileid),
+ }
+ }
+
+ /// Return all file descriptions in database.
+ pub fn files(
+ &self,
+ ) -> Result<SqlResults<(FileId, FilesystemEntry, Reason, bool)>, GenerationDbError> {
+ match &self.variant {
+ GenerationDbVariant::V0_0(v) => v.files(),
+ GenerationDbVariant::V1_0(v) => v.files(),
+ }
+ }
+
+ /// Get a file's information given its path.
+ pub fn get_file(&self, filename: &Path) -> Result<Option<FilesystemEntry>, GenerationDbError> {
+ match &self.variant {
+ GenerationDbVariant::V0_0(v) => v.get_file(filename),
+ GenerationDbVariant::V1_0(v) => v.get_file(filename),
+ }
+ }
+
+ /// Get a file's information given its id in the database.
+ pub fn get_fileno(&self, filename: &Path) -> Result<Option<FileId>, GenerationDbError> {
+ match &self.variant {
+ GenerationDbVariant::V0_0(v) => v.get_fileno(filename),
+ GenerationDbVariant::V1_0(v) => v.get_fileno(filename),
+ }
+ }
+}
+
+struct V0_0 {
+ created: bool,
+ db: Database,
+ meta: Table,
+ files: Table,
+ chunks: Table,
+}
+
+impl V0_0 {
+ const MAJOR: VersionComponent = 0;
+ const MINOR: VersionComponent = 0;
+
+ /// Create a new generation database in read/write mode.
+ pub fn create<P: AsRef<Path>>(
+ filename: P,
+ meta: Table,
+ checksum_kind: LabelChecksumKind,
+ ) -> Result<Self, GenerationDbError> {
+ let db = Database::create(filename.as_ref())?;
+ let mut moi = Self::new(db, meta);
+ moi.created = true;
+ moi.create_tables(checksum_kind)?;
+ Ok(moi)
+ }
+
+ /// Open an existing generation database in read-only mode.
+ pub fn open<P: AsRef<Path>>(filename: P, meta: Table) -> Result<Self, GenerationDbError> {
+ let db = Database::open(filename.as_ref())?;
+ Ok(Self::new(db, meta))
+ }
+
+ fn new(db: Database, meta: Table) -> Self {
+ let files = Table::new("files")
+ .column(Column::primary_key("fileno"))
+ .column(Column::blob("filename"))
+ .column(Column::text("json"))
+ .column(Column::text("reason"))
+ .column(Column::bool("is_cachedir_tag"))
+ .build();
+ let chunks = Table::new("chunks")
+ .column(Column::int("fileno"))
+ .column(Column::text("chunkid"))
+ .build();
+
+ Self {
+ created: false,
+ db,
+ meta,
+ files,
+ chunks,
+ }
+ }
+
+ fn create_tables(&mut self, checksum_kind: LabelChecksumKind) -> Result<(), GenerationDbError> {
+ self.db.create_table(&self.meta)?;
+ self.db.create_table(&self.files)?;
+ self.db.create_table(&self.chunks)?;
+
+ self.db.insert(
+ &self.meta,
+ &[
+ Value::text("key", "schema_version_major"),
+ Value::text("value", &format!("{}", Self::MAJOR)),
+ ],
+ )?;
+ self.db.insert(
+ &self.meta,
+ &[
+ Value::text("key", "schema_version_minor"),
+ Value::text("value", &format!("{}", Self::MINOR)),
+ ],
+ )?;
+ self.db.insert(
+ &self.meta,
+ &[
+ Value::text("key", "checksum_kind"),
+ Value::text("value", checksum_kind.serialize()),
+ ],
+ )?;
+
+ Ok(())
+ }
+
+ /// Close a database, commit any changes.
+ pub fn close(self) -> Result<(), GenerationDbError> {
+ if self.created {
+ self.db
+ .create_index("filenames_idx", &self.files, "filename")?;
+ self.db.create_index("fileid_idx", &self.chunks, "fileno")?;
+ }
+ self.db.close().map_err(GenerationDbError::Database)
+ }
+
+ /// Return contents of "meta" table as a HashMap.
+ pub fn meta(&self) -> Result<HashMap<String, String>, GenerationDbError> {
+ let mut map = HashMap::new();
+ let mut iter = self.db.all_rows(&self.meta, &row_to_kv)?;
+ for kv in iter.iter()? {
+ let (key, value) = kv?;
+ map.insert(key, value);
+ }
+ Ok(map)
+ }
+
+ /// Insert a file system entry into the database.
+ pub fn insert(
+ &mut self,
+ e: FilesystemEntry,
+ fileid: FileId,
+ ids: &[ChunkId],
+ reason: Reason,
+ is_cachedir_tag: bool,
+ ) -> Result<(), GenerationDbError> {
+ let json = serde_json::to_string(&e)?;
+ self.db.insert(
+ &self.files,
+ &[
+ Value::primary_key("fileno", fileid),
+ Value::blob("filename", &path_into_blob(&e.pathbuf())),
+ Value::text("json", &json),
+ Value::text("reason", &format!("{}", reason)),
+ Value::bool("is_cachedir_tag", is_cachedir_tag),
+ ],
+ )?;
+ for id in ids {
+ self.db.insert(
+ &self.chunks,
+ &[
+ Value::int("fileno", fileid),
+ Value::text("chunkid", &format!("{}", id)),
+ ],
+ )?;
+ }
+ Ok(())
+ }
+
+ /// Count number of file system entries.
+ pub fn file_count(&self) -> Result<FileId, GenerationDbError> {
+ // FIXME: this needs to be done use "SELECT count(*) FROM
+ // files", but the Database abstraction doesn't support that
+ // yet.
+ let mut iter = self.db.all_rows(&self.files, &Self::row_to_entry)?;
+ let mut count = 0;
+ for _ in iter.iter()? {
+ count += 1;
+ }
+ Ok(count)
+ }
+
+ /// Does a path refer to a cache directory?
+ pub fn is_cachedir_tag(&self, filename: &Path) -> Result<bool, GenerationDbError> {
+ let filename_vec = path_into_blob(filename);
+ let value = Value::blob("filename", &filename_vec);
+ let mut rows = self
+ .db
+ .some_rows(&self.files, &value, &Self::row_to_entry)?;
+ let mut iter = rows.iter()?;
+
+ if let Some(row) = iter.next() {
+ // Make sure there's only one row for a given filename. A
+ // bug in a previous version, or a maliciously constructed
+ // generation, could result in there being more than one.
+ if iter.next().is_some() {
+ error!("too many files in file lookup");
+ Err(GenerationDbError::TooManyFiles(filename.to_path_buf()))
+ } else {
+ let (_, _, _, is_cachedir_tag) = row?;
+ Ok(is_cachedir_tag)
+ }
+ } else {
+ Ok(false)
+ }
+ }
+
+ /// Return all chunk ids in database.
+ pub fn chunkids(&self, fileid: FileId) -> Result<SqlResults<ChunkId>, GenerationDbError> {
+ let fileid = Value::int("fileno", fileid);
+ Ok(self.db.some_rows(&self.chunks, &fileid, &row_to_chunkid)?)
+ }
+
+ /// Return all file descriptions in database.
+ pub fn files(
+ &self,
+ ) -> Result<SqlResults<(FileId, FilesystemEntry, Reason, bool)>, GenerationDbError> {
+ Ok(self.db.all_rows(&self.files, &Self::row_to_fsentry)?)
+ }
+
+ /// Get a file's information given its path.
+ pub fn get_file(&self, filename: &Path) -> Result<Option<FilesystemEntry>, GenerationDbError> {
+ match self.get_file_and_fileno(filename)? {
+ None => Ok(None),
+ Some((_, e, _)) => Ok(Some(e)),
+ }
+ }
+
+ /// Get a file's information given its id in the database.
+ pub fn get_fileno(&self, filename: &Path) -> Result<Option<FileId>, GenerationDbError> {
+ match self.get_file_and_fileno(filename)? {
+ None => Ok(None),
+ Some((id, _, _)) => Ok(Some(id)),
+ }
+ }
+
+ fn get_file_and_fileno(
+ &self,
+ filename: &Path,
+ ) -> Result<Option<(FileId, FilesystemEntry, String)>, GenerationDbError> {
+ let filename_bytes = path_into_blob(filename);
+ let value = Value::blob("filename", &filename_bytes);
+ let mut rows = self
+ .db
+ .some_rows(&self.files, &value, &Self::row_to_entry)?;
+ let mut iter = rows.iter()?;
+
+ if let Some(row) = iter.next() {
+ // Make sure there's only one row for a given filename. A
+ // bug in a previous version, or a maliciously constructed
+ // generation, could result in there being more than one.
+ if iter.next().is_some() {
+ error!("too many files in file lookup");
+ Err(GenerationDbError::TooManyFiles(filename.to_path_buf()))
+ } else {
+ let (fileid, ref json, ref reason, _) = row?;
+ let entry = serde_json::from_str(json)?;
+ Ok(Some((fileid, entry, reason.to_string())))
+ }
+ } else {
+ Ok(None)
+ }
+ }
+
+ fn row_to_entry(row: &rusqlite::Row) -> rusqlite::Result<(FileId, String, String, bool)> {
+ let fileno: FileId = row.get("fileno")?;
+ let json: String = row.get("json")?;
+ let reason: String = row.get("reason")?;
+ let is_cachedir_tag: bool = row.get("is_cachedir_tag")?;
+ Ok((fileno, json, reason, is_cachedir_tag))
+ }
+
+ fn row_to_fsentry(
+ row: &rusqlite::Row,
+ ) -> rusqlite::Result<(FileId, FilesystemEntry, Reason, bool)> {
+ let fileno: FileId = row.get("fileno")?;
+ let json: String = row.get("json")?;
+ let entry = serde_json::from_str(&json).map_err(|err| {
+ rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(err))
+ })?;
+ let reason: String = row.get("reason")?;
+ let reason = Reason::from(&reason);
+ let is_cachedir_tag: bool = row.get("is_cachedir_tag")?;
+ Ok((fileno, entry, reason, is_cachedir_tag))
+ }
+}
+
+struct V1_0 {
+ created: bool,
+ db: Database,
+ meta: Table,
+ files: Table,
+ chunks: Table,
+}
+
+impl V1_0 {
+ const MAJOR: VersionComponent = 1;
+ const MINOR: VersionComponent = 0;
+
+ /// Create a new generation database in read/write mode.
+ pub fn create<P: AsRef<Path>>(
+ filename: P,
+ meta: Table,
+ checksum_kind: LabelChecksumKind,
+ ) -> Result<Self, GenerationDbError> {
+ let db = Database::create(filename.as_ref())?;
+ let mut moi = Self::new(db, meta);
+ moi.created = true;
+ moi.create_tables(checksum_kind)?;
+ Ok(moi)
+ }
+
+ /// Open an existing generation database in read-only mode.
+ pub fn open<P: AsRef<Path>>(filename: P, meta: Table) -> Result<Self, GenerationDbError> {
+ let db = Database::open(filename.as_ref())?;
+ Ok(Self::new(db, meta))
+ }
+
+ fn new(db: Database, meta: Table) -> Self {
+ let files = Table::new("files")
+ .column(Column::primary_key("fileid"))
+ .column(Column::blob("filename"))
+ .column(Column::text("json"))
+ .column(Column::text("reason"))
+ .column(Column::bool("is_cachedir_tag"))
+ .build();
+ let chunks = Table::new("chunks")
+ .column(Column::int("fileid"))
+ .column(Column::text("chunkid"))
+ .build();
+
+ Self {
+ created: false,
+ db,
+ meta,
+ files,
+ chunks,
+ }
+ }
+
+ fn create_tables(&mut self, checksum_kind: LabelChecksumKind) -> Result<(), GenerationDbError> {
+ self.db.create_table(&self.meta)?;
+ self.db.create_table(&self.files)?;
+ self.db.create_table(&self.chunks)?;
+
+ self.db.insert(
+ &self.meta,
+ &[
+ Value::text("key", "schema_version_major"),
+ Value::text("value", &format!("{}", Self::MAJOR)),
+ ],
+ )?;
+ self.db.insert(
+ &self.meta,
+ &[
+ Value::text("key", "schema_version_minor"),
+ Value::text("value", &format!("{}", Self::MINOR)),
+ ],
+ )?;
+ self.db.insert(
+ &self.meta,
+ &[
+ Value::text("key", "checksum_kind"),
+ Value::text("value", checksum_kind.serialize()),
+ ],
+ )?;
+
+ Ok(())
+ }
+
+ /// Close a database, commit any changes.
+ pub fn close(self) -> Result<(), GenerationDbError> {
+ if self.created {
+ self.db
+ .create_index("filenames_idx", &self.files, "filename")?;
+ self.db.create_index("fileid_idx", &self.chunks, "fileid")?;
+ }
+ self.db.close().map_err(GenerationDbError::Database)
+ }
+
+ /// Return contents of "meta" table as a HashMap.
+ pub fn meta(&self) -> Result<HashMap<String, String>, GenerationDbError> {
+ let mut map = HashMap::new();
+ let mut iter = self.db.all_rows(&self.meta, &row_to_kv)?;
+ for kv in iter.iter()? {
+ let (key, value) = kv?;
+ map.insert(key, value);
+ }
+ Ok(map)
+ }
+
+ /// Insert a file system entry into the database.
+ pub fn insert(
+ &mut self,
+ e: FilesystemEntry,
+ fileid: FileId,
+ ids: &[ChunkId],
+ reason: Reason,
+ is_cachedir_tag: bool,
+ ) -> Result<(), GenerationDbError> {
+ let json = serde_json::to_string(&e)?;
+ self.db.insert(
+ &self.files,
+ &[
+ Value::primary_key("fileid", fileid),
+ Value::blob("filename", &path_into_blob(&e.pathbuf())),
+ Value::text("json", &json),
+ Value::text("reason", &format!("{}", reason)),
+ Value::bool("is_cachedir_tag", is_cachedir_tag),
+ ],
+ )?;
+ for id in ids {
+ self.db.insert(
+ &self.chunks,
+ &[
+ Value::int("fileid", fileid),
+ Value::text("chunkid", &format!("{}", id)),
+ ],
+ )?;
+ }
+ Ok(())
+ }
+
+ /// Count number of file system entries.
+ pub fn file_count(&self) -> Result<FileId, GenerationDbError> {
+ // FIXME: this needs to be done use "SELECT count(*) FROM
+ // files", but the Database abstraction doesn't support that
+ // yet.
+ let mut iter = self.db.all_rows(&self.files, &Self::row_to_entry)?;
+ let mut count = 0;
+ for _ in iter.iter()? {
+ count += 1;
+ }
+ Ok(count)
+ }
+
+ /// Does a path refer to a cache directory?
+ pub fn is_cachedir_tag(&self, filename: &Path) -> Result<bool, GenerationDbError> {
+ let filename_vec = path_into_blob(filename);
+ let value = Value::blob("filename", &filename_vec);
+ let mut rows = self
+ .db
+ .some_rows(&self.files, &value, &Self::row_to_entry)?;
+ let mut iter = rows.iter()?;
+
+ if let Some(row) = iter.next() {
+ // Make sure there's only one row for a given filename. A
+ // bug in a previous version, or a maliciously constructed
+ // generation, could result in there being more than one.
+ if iter.next().is_some() {
+ error!("too many files in file lookup");
+ Err(GenerationDbError::TooManyFiles(filename.to_path_buf()))
+ } else {
+ let (_, _, _, is_cachedir_tag) = row?;
+ Ok(is_cachedir_tag)
+ }
+ } else {
+ Ok(false)
+ }
+ }
+
+ /// Return all chunk ids in database.
+ pub fn chunkids(&self, fileid: FileId) -> Result<SqlResults<ChunkId>, GenerationDbError> {
+ let fileid = Value::int("fileid", fileid);
+ Ok(self.db.some_rows(&self.chunks, &fileid, &row_to_chunkid)?)
+ }
+
+ /// Return all file descriptions in database.
+ pub fn files(
+ &self,
+ ) -> Result<SqlResults<(FileId, FilesystemEntry, Reason, bool)>, GenerationDbError> {
+ Ok(self.db.all_rows(&self.files, &Self::row_to_fsentry)?)
+ }
+
+ /// Get a file's information given its path.
+ pub fn get_file(&self, filename: &Path) -> Result<Option<FilesystemEntry>, GenerationDbError> {
+ match self.get_file_and_fileno(filename)? {
+ None => Ok(None),
+ Some((_, e, _)) => Ok(Some(e)),
+ }
+ }
+
+ /// Get a file's information given its id in the database.
+ pub fn get_fileno(&self, filename: &Path) -> Result<Option<FileId>, GenerationDbError> {
+ match self.get_file_and_fileno(filename)? {
+ None => Ok(None),
+ Some((id, _, _)) => Ok(Some(id)),
+ }
+ }
+
+ fn get_file_and_fileno(
+ &self,
+ filename: &Path,
+ ) -> Result<Option<(FileId, FilesystemEntry, String)>, GenerationDbError> {
+ let filename_bytes = path_into_blob(filename);
+ let value = Value::blob("filename", &filename_bytes);
+ let mut rows = self
+ .db
+ .some_rows(&self.files, &value, &Self::row_to_entry)?;
+ let mut iter = rows.iter()?;
+
+ if let Some(row) = iter.next() {
+ // Make sure there's only one row for a given filename. A
+ // bug in a previous version, or a maliciously constructed
+ // generation, could result in there being more than one.
+ if iter.next().is_some() {
+ error!("too many files in file lookup");
+ Err(GenerationDbError::TooManyFiles(filename.to_path_buf()))
+ } else {
+ let (fileid, ref json, ref reason, _) = row?;
+ let entry = serde_json::from_str(json)?;
+ Ok(Some((fileid, entry, reason.to_string())))
+ }
+ } else {
+ Ok(None)
+ }
+ }
+
+ fn row_to_entry(row: &rusqlite::Row) -> rusqlite::Result<(FileId, String, String, bool)> {
+ let fileno: FileId = row.get("fileid")?;
+ let json: String = row.get("json")?;
+ let reason: String = row.get("reason")?;
+ let is_cachedir_tag: bool = row.get("is_cachedir_tag")?;
+ Ok((fileno, json, reason, is_cachedir_tag))
+ }
+
+ fn row_to_fsentry(
+ row: &rusqlite::Row,
+ ) -> rusqlite::Result<(FileId, FilesystemEntry, Reason, bool)> {
+ let fileno: FileId = row.get("fileid")?;
+ let json: String = row.get("json")?;
+ let entry = serde_json::from_str(&json).map_err(|err| {
+ rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(err))
+ })?;
+ let reason: String = row.get("reason")?;
+ let reason = Reason::from(&reason);
+ let is_cachedir_tag: bool = row.get("is_cachedir_tag")?;
+ Ok((fileno, entry, reason, is_cachedir_tag))
+ }
+}
+
+fn row_to_kv(row: &rusqlite::Row) -> rusqlite::Result<(String, String)> {
+ let k = row.get("key")?;
+ let v = row.get("value")?;
+ Ok((k, v))
+}
+
+fn path_into_blob(path: &Path) -> Vec<u8> {
+ path.as_os_str().as_bytes().to_vec()
+}
+
+fn row_to_chunkid(row: &rusqlite::Row) -> rusqlite::Result<ChunkId> {
+ let chunkid: String = row.get("chunkid")?;
+ let chunkid = ChunkId::recreate(&chunkid);
+ Ok(chunkid)
+}
+
+#[cfg(test)]
+mod test {
+ use super::Database;
+ use tempfile::tempdir;
+
+ #[test]
+ fn opens_previously_created_db() {
+ let dir = tempdir().unwrap();
+ let filename = dir.path().join("test.db");
+ Database::create(&filename).unwrap();
+ assert!(Database::open(&filename).is_ok());
+ }
+}
diff --git a/src/engine.rs b/src/engine.rs
new file mode 100644
index 0000000..d35281b
--- /dev/null
+++ b/src/engine.rs
@@ -0,0 +1,123 @@
+//! Engine for doing CPU heavy work in the background.
+
+use crate::workqueue::WorkQueue;
+use futures::stream::{FuturesOrdered, StreamExt};
+use tokio::select;
+use tokio::sync::mpsc;
+
+/// Do heavy work in the background.
+///
+/// An engine takes items of work from a work queue, and does the work
+/// in the background, using `tokio` blocking tasks. The background
+/// work can be CPU intensive or block on I/O. The number of active
+/// concurrent tasks is limited to the size of the queue.
+///
+/// The actual work is done in a function or closure passed in as a
+/// parameter to the engine. The worker function is called with a work
+/// item as an argument, in a thread dedicated for that worker
+/// function.
+///
+/// The need to move work items between threads puts some restrictions
+/// on the types used as work items.
+pub struct Engine<T> {
+ rx: mpsc::Receiver<T>,
+}
+
+impl<T: Send + 'static> Engine<T> {
+ /// Create a new engine.
+ ///
+ /// Each engine gets work from a queue, and calls the same worker
+ /// function for each item of work. The results are put into
+ /// another, internal queue.
+ pub fn new<S, F>(queue: WorkQueue<S>, func: F) -> Self
+ where
+ F: Send + Copy + 'static + Fn(S) -> T,
+ S: Send + 'static,
+ {
+ let size = queue.size();
+ let (tx, rx) = mpsc::channel(size);
+ tokio::spawn(manage_workers(queue, size, tx, func));
+ Self { rx }
+ }
+
+ /// Get the oldest result of the worker function, if any.
+ ///
+ /// This will block until there is a result, or it's known that no
+ /// more results will be forthcoming.
+ pub async fn next(&mut self) -> Option<T> {
+ self.rx.recv().await
+ }
+}
+
+// This is a normal (non-blocking) background task that retrieves work
+// items, launches blocking background tasks for work to be done, and
+// waits on those tasks. Care is taken to not launch too many worker
+// tasks.
+async fn manage_workers<S, T, F>(
+ mut queue: WorkQueue<S>,
+ queue_size: usize,
+ tx: mpsc::Sender<T>,
+ func: F,
+) where
+ F: Send + 'static + Copy + Fn(S) -> T,
+ S: Send + 'static,
+ T: Send + 'static,
+{
+ let mut workers = FuturesOrdered::new();
+
+ 'processing: loop {
+ // Wait for first of various concurrent things to finish.
+ select! {
+ biased;
+
+ // Get work to be done.
+ maybe_work = queue.next() => {
+ if let Some(work) = maybe_work {
+ // We got a work item. Launch background task to
+ // work on it.
+ let tx = tx.clone();
+ workers.push_back(do_work(work, tx, func));
+
+ // If queue is full, wait for at least one
+ // background task to finish.
+ while workers.len() >= queue_size {
+ workers.next().await;
+ }
+ } else {
+ // Finished with the input queue. Nothing more to do.
+ break 'processing;
+ }
+ }
+
+ // Wait for background task to finish, if there are any
+ // background tasks currently running.
+ _ = workers.next(), if !workers.is_empty() => {
+ // nothing to do here
+ }
+ }
+ }
+
+ while workers.next().await.is_some() {
+ // Finish the remaining work items.
+ }
+}
+
+// Work on a work item.
+//
+// This launches a `tokio` blocking background task, and waits for it
+// to finish. The caller spawns a normal (non-blocking) async task for
+// this function, so it's OK for this function to wait on the task it
+// launches.
+async fn do_work<S, T, F>(item: S, tx: mpsc::Sender<T>, func: F)
+where
+ F: Send + 'static + Fn(S) -> T,
+ S: Send + 'static,
+ T: Send + 'static,
+{
+ let result = tokio::task::spawn_blocking(move || func(item))
+ .await
+ .unwrap();
+ if let Err(err) = tx.send(result).await {
+ panic!("failed to send result to channel: {}", err);
+ }
+}
diff --git a/src/error.rs b/src/error.rs
index d368763..928f258 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -1,25 +1,99 @@
-use crate::chunkid::ChunkId;
+//! Errors from Obnam client.
+
+use crate::backup_run::BackupError;
+use crate::chunk::ClientTrustError;
+use crate::cipher::CipherError;
+use crate::client::ClientError;
+use crate::cmd::restore::RestoreError;
+use crate::config::ClientConfigError;
+use crate::db::DatabaseError;
+use crate::dbgen::GenerationDbError;
+use crate::generation::{LocalGenerationError, NascentError};
+use crate::genlist::GenerationListError;
+use crate::label::LabelError;
+use crate::passwords::PasswordError;
use std::path::PathBuf;
-use thiserror::Error;
+use std::time::SystemTimeError;
+use tempfile::PersistError;
-/// Define all the kinds of errors any part of this crate can return.
-#[derive(Debug, Error)]
+/// Define all the kinds of errors that functions corresponding to
+/// subcommands of the main program can return.
+///
+/// This collects all kinds of errors the Obnam client may get, for
+/// convenience.
+#[derive(Debug, thiserror::Error)]
pub enum ObnamError {
- #[error("Can't find backup '{0}'")]
- UnknownGeneration(String),
+ /// Error from chunk labels.
+ #[error(transparent)]
+ Label(#[from] LabelError),
+
+ /// Error listing generations on server.
+ #[error(transparent)]
+ GenerationListError(#[from] GenerationListError),
+
+ /// Error about client trust chunks.
+ #[error(transparent)]
+ ClientTrust(#[from] ClientTrustError),
+
+ /// Error saving passwords.
+ #[error("couldn't save passwords to {0}: {1}")]
+ PasswordSave(PathBuf, PasswordError),
+
+ /// Error using server HTTP API.
+ #[error(transparent)]
+ ClientError(#[from] ClientError),
+
+ /// Error in client configuration.
+ #[error(transparent)]
+ ClientConfigError(#[from] ClientConfigError),
+
+ /// Error making a backup.
+ #[error(transparent)]
+ BackupError(#[from] BackupError),
+
+ /// Error making a new backup generation.
+ #[error(transparent)]
+ NascentError(#[from] NascentError),
+
+ /// Error encrypting or decrypting.
+ #[error(transparent)]
+ CipherError(#[from] CipherError),
+
+ /// Error using local copy of existing backup generation.
+ #[error(transparent)]
+ LocalGenerationError(#[from] LocalGenerationError),
+
+ /// Error from generation database.
+ #[error(transparent)]
+ GenerationDb(#[from] GenerationDbError),
+
+ /// Error using a Database.
+ #[error(transparent)]
+ Database(#[from] DatabaseError),
+
+ /// Error restoring a backup.
+ #[error(transparent)]
+ RestoreError(#[from] RestoreError),
- #[error("Generation has more than one file with the name {0}")]
- TooManyFiles(PathBuf),
+ /// Error making temporary file persistent.
+ #[error(transparent)]
+ PersistError(#[from] PersistError),
- #[error("Server response did not have a 'chunk-meta' header for chunk {0}")]
- NoChunkMeta(ChunkId),
+ /// Error doing I/O.
+ #[error(transparent)]
+ IoError(#[from] std::io::Error),
- #[error("Wrong checksum for chunk {0}, got {1}, expected {2}")]
- WrongChecksum(ChunkId, String, String),
+ /// Error reading system clock.
+ #[error(transparent)]
+ SystemTimeError(#[from] SystemTimeError),
- #[error("Chunk is missing: {0}")]
- MissingChunk(ChunkId),
+ /// Error regarding JSON.
+ #[error(transparent)]
+ SerdeJsonError(#[from] serde_json::Error),
- #[error("Chunk is in store too many times: {0}")]
- DuplicateChunk(ChunkId),
+ /// Unexpected cache directories found.
+ #[error(
+ "found CACHEDIR.TAG files that aren't present in the previous backup, might be an attack"
+ )]
+ NewCachedirTagsFound,
}
diff --git a/src/fsentry.rs b/src/fsentry.rs
index eae11b4..f31d6b5 100644
--- a/src/fsentry.rs
+++ b/src/fsentry.rs
@@ -1,10 +1,20 @@
+//! An entry in the file system.
+
+use log::{debug, error};
use serde::{Deserialize, Serialize};
use std::ffi::OsString;
use std::fs::read_link;
use std::fs::{FileType, Metadata};
-use std::os::linux::fs::MetadataExt;
use std::os::unix::ffi::OsStringExt;
+use std::os::unix::fs::FileTypeExt;
use std::path::{Path, PathBuf};
+use users::{Groups, Users, UsersCache};
+
+#[cfg(target_os = "linux")]
+use std::os::linux::fs::MetadataExt;
+
+#[cfg(target_os = "macos")]
+use std::os::macos::fs::MetadataExt;
/// A file system entry.
///
@@ -35,80 +45,245 @@ pub struct FilesystemEntry {
// The target of a symbolic link, if any.
symlink_target: Option<PathBuf>,
+
+ // User and group owning the file. We store them as both the
+ // numeric id and the textual name corresponding to the numeric id
+ // at the time of the backup.
+ uid: u32,
+ gid: u32,
+ user: String,
+ group: String,
+}
+
+/// Possible errors related to file system entries.
+#[derive(Debug, thiserror::Error)]
+pub enum FsEntryError {
+ /// File kind numeric representation is unknown.
+ #[error("Unknown file kind {0}")]
+ UnknownFileKindCode(u8),
+
+ /// Failed to read a symbolic link's target.
+ #[error("failed to read symbolic link target {0}: {1}")]
+ ReadLink(PathBuf, std::io::Error),
}
#[allow(clippy::len_without_is_empty)]
impl FilesystemEntry {
- pub fn from_metadata(path: &Path, meta: &Metadata) -> anyhow::Result<Self> {
+ /// Create an `FsEntry` from a file's metadata.
+ pub fn from_metadata(
+ path: &Path,
+ meta: &Metadata,
+ cache: &mut UsersCache,
+ ) -> Result<Self, FsEntryError> {
let kind = FilesystemKind::from_file_type(meta.file_type());
- Ok(Self {
- path: path.to_path_buf().into_os_string().into_vec(),
- kind: FilesystemKind::from_file_type(meta.file_type()),
- len: meta.len(),
- mode: meta.st_mode(),
- mtime: meta.st_mtime(),
- mtime_ns: meta.st_mtime_nsec(),
- atime: meta.st_atime(),
- atime_ns: meta.st_atime_nsec(),
- symlink_target: if kind == FilesystemKind::Symlink {
- Some(read_link(path)?)
- } else {
- None
- },
- })
+ Ok(EntryBuilder::new(kind)
+ .path(path.to_path_buf())
+ .len(meta.len())
+ .mode(meta.st_mode())
+ .mtime(meta.st_mtime(), meta.st_mtime_nsec())
+ .atime(meta.st_atime(), meta.st_atime_nsec())
+ .user(meta.st_uid(), cache)?
+ .group(meta.st_uid(), cache)?
+ .symlink_target()?
+ .build())
}
+ /// Return the kind of file the entry refers to.
pub fn kind(&self) -> FilesystemKind {
self.kind
}
+ /// Return full path to the entry.
pub fn pathbuf(&self) -> PathBuf {
let path = self.path.clone();
PathBuf::from(OsString::from_vec(path))
}
+ /// Return number of bytes for the entity represented by the entry.
pub fn len(&self) -> u64 {
self.len
}
+ /// Return the entry's mode bits.
pub fn mode(&self) -> u32 {
self.mode
}
+ /// Return the entry's access time, whole seconds.
pub fn atime(&self) -> i64 {
self.atime
}
+ /// Return the entry's access time, nanoseconds since the last full second.
pub fn atime_ns(&self) -> i64 {
self.atime_ns
}
+ /// Return the entry's modification time, whole seconds.
pub fn mtime(&self) -> i64 {
self.mtime
}
+ /// Return the entry's modification time, nanoseconds since the last full second.
pub fn mtime_ns(&self) -> i64 {
self.mtime_ns
}
+ /// Does the entry represent a directory?
pub fn is_dir(&self) -> bool {
self.kind() == FilesystemKind::Directory
}
+ /// Return target of the symlink the entry represents.
pub fn symlink_target(&self) -> Option<PathBuf> {
self.symlink_target.clone()
}
}
+#[derive(Debug)]
+pub(crate) struct EntryBuilder {
+ kind: FilesystemKind,
+ path: PathBuf,
+ len: u64,
+
+ // 16 bits should be enough for a Unix mode_t.
+ // https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/sys_stat.h.html
+ // However, it's 32 bits on Linux, so that's what we store.
+ mode: u32,
+
+ // Linux can store file system time stamps in nanosecond
+ // resolution. We store them as two 64-bit integers.
+ mtime: i64,
+ mtime_ns: i64,
+ atime: i64,
+ atime_ns: i64,
+
+ // The target of a symbolic link, if any.
+ symlink_target: Option<PathBuf>,
+
+ // User and group owning the file. We store them as both the
+ // numeric id and the textual name corresponding to the numeric id
+ // at the time of the backup.
+ uid: u32,
+ gid: u32,
+ user: String,
+ group: String,
+}
+
+impl EntryBuilder {
+ pub(crate) fn new(kind: FilesystemKind) -> Self {
+ Self {
+ kind,
+ path: PathBuf::new(),
+ len: 0,
+ mode: 0,
+ mtime: 0,
+ mtime_ns: 0,
+ atime: 0,
+ atime_ns: 0,
+ symlink_target: None,
+ uid: 0,
+ user: "".to_string(),
+ gid: 0,
+ group: "".to_string(),
+ }
+ }
+
+ pub(crate) fn build(self) -> FilesystemEntry {
+ FilesystemEntry {
+ kind: self.kind,
+ path: self.path.into_os_string().into_vec(),
+ len: self.len,
+ mode: self.mode,
+ mtime: self.mtime,
+ mtime_ns: self.mtime_ns,
+ atime: self.atime,
+ atime_ns: self.atime_ns,
+ symlink_target: self.symlink_target,
+ uid: self.uid,
+ user: self.user,
+ gid: self.gid,
+ group: self.group,
+ }
+ }
+
+ pub(crate) fn path(mut self, path: PathBuf) -> Self {
+ self.path = path;
+ self
+ }
+
+ pub(crate) fn len(mut self, len: u64) -> Self {
+ self.len = len;
+ self
+ }
+
+ pub(crate) fn mode(mut self, mode: u32) -> Self {
+ self.mode = mode;
+ self
+ }
+
+ pub(crate) fn mtime(mut self, secs: i64, nsec: i64) -> Self {
+ self.mtime = secs;
+ self.mtime_ns = nsec;
+ self
+ }
+
+ pub(crate) fn atime(mut self, secs: i64, nsec: i64) -> Self {
+ self.atime = secs;
+ self.atime_ns = nsec;
+ self
+ }
+
+ pub(crate) fn symlink_target(mut self) -> Result<Self, FsEntryError> {
+ self.symlink_target = if self.kind == FilesystemKind::Symlink {
+ debug!("reading symlink target for {:?}", self.path);
+ let target = read_link(&self.path)
+ .map_err(|err| FsEntryError::ReadLink(self.path.clone(), err))?;
+ Some(target)
+ } else {
+ None
+ };
+ Ok(self)
+ }
+
+ pub(crate) fn user(mut self, uid: u32, cache: &mut UsersCache) -> Result<Self, FsEntryError> {
+ self.uid = uid;
+ self.user = if let Some(user) = cache.get_user_by_uid(uid) {
+ user.name().to_string_lossy().to_string()
+ } else {
+ "".to_string()
+ };
+ Ok(self)
+ }
+
+ pub(crate) fn group(mut self, gid: u32, cache: &mut UsersCache) -> Result<Self, FsEntryError> {
+ self.gid = gid;
+ self.group = if let Some(group) = cache.get_group_by_gid(gid) {
+ group.name().to_string_lossy().to_string()
+ } else {
+ "".to_string()
+ };
+ Ok(self)
+ }
+}
+
/// Different types of file system entries.
-#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum FilesystemKind {
+ /// Regular file, including a hard link to one.
Regular,
+ /// A directory.
Directory,
+ /// A symbolic link.
Symlink,
+ /// A UNIX domain socket.
+ Socket,
+ /// A UNIX named pipe.
+ Fifo,
}
impl FilesystemKind {
+ /// Create a kind from a file type.
pub fn from_file_type(file_type: FileType) -> Self {
if file_type.is_file() {
FilesystemKind::Regular
@@ -116,35 +291,39 @@ impl FilesystemKind {
FilesystemKind::Directory
} else if file_type.is_symlink() {
FilesystemKind::Symlink
+ } else if file_type.is_socket() {
+ FilesystemKind::Socket
+ } else if file_type.is_fifo() {
+ FilesystemKind::Fifo
} else {
panic!("unknown file type {:?}", file_type);
}
}
+ /// Represent a kind as a numeric code.
pub fn as_code(&self) -> u8 {
match self {
FilesystemKind::Regular => 0,
FilesystemKind::Directory => 1,
FilesystemKind::Symlink => 2,
+ FilesystemKind::Socket => 3,
+ FilesystemKind::Fifo => 4,
}
}
- pub fn from_code(code: u8) -> anyhow::Result<Self> {
+ /// Create a kind from a numeric code.
+ pub fn from_code(code: u8) -> Result<Self, FsEntryError> {
match code {
0 => Ok(FilesystemKind::Regular),
1 => Ok(FilesystemKind::Directory),
2 => Ok(FilesystemKind::Symlink),
- _ => Err(Error::UnknownFileKindCode(code).into()),
+ 3 => Ok(FilesystemKind::Socket),
+ 4 => Ok(FilesystemKind::Fifo),
+ _ => Err(FsEntryError::UnknownFileKindCode(code)),
}
}
}
-#[derive(Debug, thiserror::Error)]
-pub enum Error {
- #[error("unknown file kind code {0}")]
- UnknownFileKindCode(u8),
-}
-
#[cfg(test)]
mod test {
use super::FilesystemKind;
@@ -153,6 +332,9 @@ mod test {
fn file_kind_regular_round_trips() {
one_file_kind_round_trip(FilesystemKind::Regular);
one_file_kind_round_trip(FilesystemKind::Directory);
+ one_file_kind_round_trip(FilesystemKind::Symlink);
+ one_file_kind_round_trip(FilesystemKind::Socket);
+ one_file_kind_round_trip(FilesystemKind::Fifo);
}
fn one_file_kind_round_trip(kind: FilesystemKind) {
diff --git a/src/fsiter.rs b/src/fsiter.rs
index a40ad34..ef2886d 100644
--- a/src/fsiter.rs
+++ b/src/fsiter.rs
@@ -1,37 +1,157 @@
-use crate::fsentry::FilesystemEntry;
-use log::info;
-use std::path::Path;
-use walkdir::{IntoIter, WalkDir};
+//! Iterate over directory tree.
+
+use crate::fsentry::{FilesystemEntry, FsEntryError};
+use log::warn;
+use std::path::{Path, PathBuf};
+use users::UsersCache;
+use walkdir::{DirEntry, IntoIter, WalkDir};
+
+/// Filesystem entry along with additional info about it.
+pub struct AnnotatedFsEntry {
+ /// The file system entry being annotated.
+ pub inner: FilesystemEntry,
+ /// Is `entry` a valid CACHEDIR.TAG?
+ pub is_cachedir_tag: bool,
+}
/// Iterator over file system entries in a directory tree.
pub struct FsIterator {
- iter: IntoIter,
+ iter: SkipCachedirs,
+}
+
+/// Possible errors from iterating over a directory tree.
+#[derive(Debug, thiserror::Error)]
+pub enum FsIterError {
+ /// Error from the walkdir crate.
+ #[error("walkdir failed: {0}")]
+ WalkDir(walkdir::Error),
+
+ /// Error reading a file's metadata.
+ #[error("failed to get file system metadata for {0}: {1}")]
+ Metadata(PathBuf, std::io::Error),
+
+ /// Error related to file system entries.
+ #[error(transparent)]
+ FsEntryError(#[from] FsEntryError),
}
impl FsIterator {
- pub fn new(root: &Path) -> Self {
+ /// Create a new iterator.
+ pub fn new(root: &Path, exclude_cache_tag_directories: bool) -> Self {
Self {
- iter: WalkDir::new(root).into_iter(),
+ iter: SkipCachedirs::new(
+ WalkDir::new(root).into_iter(),
+ exclude_cache_tag_directories,
+ ),
}
}
}
impl Iterator for FsIterator {
- type Item = Result<FilesystemEntry, anyhow::Error>;
+ type Item = Result<AnnotatedFsEntry, FsIterError>;
fn next(&mut self) -> Option<Self::Item> {
- match self.iter.next() {
- None => None,
- Some(Ok(entry)) => {
- info!("found {}", entry.path().display());
- Some(new_entry(&entry))
- }
- Some(Err(err)) => Some(Err(err.into())),
+ self.iter.next()
+ }
+}
+
+/// Cachedir-aware adaptor for WalkDir: it skips the contents of dirs that contain CACHEDIR.TAG,
+/// but still yields entries for the dir and the tag themselves.
+struct SkipCachedirs {
+ cache: UsersCache,
+ iter: IntoIter,
+ exclude_cache_tag_directories: bool,
+ // This is the last tag we've found. `next()` will yield it before asking `iter` for more
+ // entries.
+ cachedir_tag: Option<Result<AnnotatedFsEntry, FsIterError>>,
+}
+
+impl SkipCachedirs {
+ fn new(iter: IntoIter, exclude_cache_tag_directories: bool) -> Self {
+ Self {
+ cache: UsersCache::new(),
+ iter,
+ exclude_cache_tag_directories,
+ cachedir_tag: None,
+ }
+ }
+
+ fn try_enqueue_cachedir_tag(&mut self, entry: &DirEntry) {
+ if !self.exclude_cache_tag_directories {
+ return;
+ }
+
+ // If this entry is not a directory, it means we already processed its
+ // parent dir and decided that it's not cached.
+ if !entry.file_type().is_dir() {
+ return;
+ }
+
+ let mut tag_path = entry.path().to_owned();
+ tag_path.push("CACHEDIR.TAG");
+
+ // Tags are required to be regular files -- not even symlinks are allowed.
+ if !tag_path.is_file() {
+ return;
+ };
+
+ const CACHEDIR_TAG: &[u8] = b"Signature: 8a477f597d28d172789f06886806bc55";
+ let mut content = [0u8; CACHEDIR_TAG.len()];
+
+ let mut file = if let Ok(file) = std::fs::File::open(&tag_path) {
+ file
+ } else {
+ return;
+ };
+
+ use std::io::Read;
+ match file.read_exact(&mut content) {
+ Ok(_) => (),
+ // If we can't read the tag file, proceed as if's not there
+ Err(_) => return,
+ }
+
+ if content == CACHEDIR_TAG {
+ self.iter.skip_current_dir();
+ self.cachedir_tag = Some(new_entry(&tag_path, true, &mut self.cache));
}
}
}
-fn new_entry(e: &walkdir::DirEntry) -> anyhow::Result<FilesystemEntry> {
- let meta = e.metadata()?;
- let entry = FilesystemEntry::from_metadata(e.path(), &meta)?;
- Ok(entry)
+impl Iterator for SkipCachedirs {
+ type Item = Result<AnnotatedFsEntry, FsIterError>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.cachedir_tag.take().or_else(|| {
+ let next = self.iter.next();
+ match next {
+ None => None,
+ Some(Err(err)) => Some(Err(FsIterError::WalkDir(err))),
+ Some(Ok(entry)) => {
+ self.try_enqueue_cachedir_tag(&entry);
+ Some(new_entry(entry.path(), false, &mut self.cache))
+ }
+ }
+ })
+ }
+}
+
+fn new_entry(
+ path: &Path,
+ is_cachedir_tag: bool,
+ cache: &mut UsersCache,
+) -> Result<AnnotatedFsEntry, FsIterError> {
+ let meta = std::fs::symlink_metadata(path);
+ let meta = match meta {
+ Ok(meta) => meta,
+ Err(err) => {
+ warn!("failed to get metadata for {}: {}", path.display(), err);
+ return Err(FsIterError::Metadata(path.to_path_buf(), err));
+ }
+ };
+ let entry = FilesystemEntry::from_metadata(path, &meta, cache)?;
+ let annotated = AnnotatedFsEntry {
+ inner: entry,
+ is_cachedir_tag,
+ };
+ Ok(annotated)
}
diff --git a/src/generation.rs b/src/generation.rs
index 8a15363..477edc0 100644
--- a/src/generation.rs
+++ b/src/generation.rs
@@ -1,11 +1,43 @@
+//! Backup generations of various kinds.
+
use crate::backup_reason::Reason;
use crate::chunkid::ChunkId;
+use crate::db::{DatabaseError, SqlResults};
+use crate::dbgen::{FileId, GenerationDb, GenerationDbError};
use crate::fsentry::FilesystemEntry;
-use rusqlite::Connection;
-use std::path::Path;
+use crate::genmeta::{GenerationMeta, GenerationMetaError};
+use crate::label::LabelChecksumKind;
+use crate::schema::{SchemaVersion, VersionComponent};
+use serde::Serialize;
+use std::fmt;
+use std::path::{Path, PathBuf};
+
+/// An identifier for a generation.
+#[derive(Debug, Clone, Serialize)]
+pub struct GenId {
+ id: ChunkId,
+}
+
+impl GenId {
+ /// Create a generation identifier from a chunk identifier.
+ pub fn from_chunk_id(id: ChunkId) -> Self {
+ Self { id }
+ }
-/// An identifier for a file in a generation.
-type FileId = i64;
+ /// Convert a generation identifier into a chunk identifier.
+ pub fn as_chunk_id(&self) -> &ChunkId {
+ &self.id
+ }
+}
+
+impl fmt::Display for GenId {
+ /// Format an identifier for display.
+ ///
+ /// The output can be parsed to re-created an identical identifier.
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{}", self.id)
+ }
+}
/// A nascent backup generation.
///
@@ -13,90 +45,103 @@ type FileId = i64;
/// finished yet, and it's not actually on the server until the upload
/// of its generation chunk.
pub struct NascentGeneration {
- conn: Connection,
+ db: GenerationDb,
fileno: FileId,
}
+/// Possible errors from nascent backup generations.
+#[derive(Debug, thiserror::Error)]
+pub enum NascentError {
+ /// Error backing up a backup root.
+ #[error("Could not back up a backup root directory: {0}: {1}")]
+ BackupRootFailed(PathBuf, crate::fsiter::FsIterError),
+
+ /// Error using a local generation.
+ #[error(transparent)]
+ LocalGenerationError(#[from] LocalGenerationError),
+
+ /// Error from a GenerationDb.
+ #[error(transparent)]
+ GenerationDb(#[from] GenerationDbError),
+
+ /// Error from an SQL transaction.
+ #[error("SQL transaction error: {0}")]
+ Transaction(rusqlite::Error),
+
+ /// Error from committing an SQL transaction.
+ #[error("SQL commit error: {0}")]
+ Commit(rusqlite::Error),
+
+ /// Error creating a temporary file.
+ #[error("Failed to create temporary file: {0}")]
+ TempFile(#[from] std::io::Error),
+}
+
impl NascentGeneration {
- pub fn create<P>(filename: P) -> anyhow::Result<Self>
+ /// Create a new nascent generation.
+ pub fn create<P>(
+ filename: P,
+ schema: SchemaVersion,
+ checksum_kind: LabelChecksumKind,
+ ) -> Result<Self, NascentError>
where
P: AsRef<Path>,
{
- let conn = sql::create_db(filename.as_ref())?;
- Ok(Self { conn, fileno: 0 })
+ let db = GenerationDb::create(filename.as_ref(), schema, checksum_kind)?;
+ Ok(Self { db, fileno: 0 })
+ }
+
+ /// Commit any changes, and close the database.
+ pub fn close(self) -> Result<(), NascentError> {
+ self.db.close().map_err(NascentError::GenerationDb)
}
+ /// How many files are there now in the nascent generation?
pub fn file_count(&self) -> FileId {
self.fileno
}
+ /// Insert a new file system entry into a nascent generation.
pub fn insert(
&mut self,
e: FilesystemEntry,
ids: &[ChunkId],
reason: Reason,
- ) -> anyhow::Result<()> {
- let t = self.conn.transaction()?;
+ is_cachedir_tag: bool,
+ ) -> Result<(), NascentError> {
self.fileno += 1;
- sql::insert_one(&t, e, self.fileno, ids, reason)?;
- t.commit()?;
+ self.db
+ .insert(e, self.fileno, ids, reason, is_cachedir_tag)?;
Ok(())
}
-
- pub fn insert_iter<'a>(
- &mut self,
- entries: impl Iterator<Item = anyhow::Result<(FilesystemEntry, Vec<ChunkId>, Reason)>>,
- ) -> anyhow::Result<()> {
- let t = self.conn.transaction()?;
- for r in entries {
- let (e, ids, reason) = r?;
- self.fileno += 1;
- sql::insert_one(&t, e, self.fileno, &ids[..], reason)?;
- }
- t.commit()?;
- Ok(())
- }
-}
-
-#[cfg(test)]
-mod test {
- use super::NascentGeneration;
- use tempfile::NamedTempFile;
-
- #[test]
- fn empty() {
- let filename = NamedTempFile::new().unwrap().path().to_path_buf();
- {
- let mut _gen = NascentGeneration::create(&filename).unwrap();
- // _gen is dropped here; the connection is close; the file
- // should not be removed.
- }
- assert!(filename.exists());
- }
}
-/// A finished generation.
+/// A finished generation on the server.
///
-/// A generation is finished when it's on the server. It can be restored.
+/// A generation is finished when it's on the server. It can be
+/// fetched so it can be used as a [`LocalGeneration`].
#[derive(Debug, Clone)]
pub struct FinishedGeneration {
- id: ChunkId,
+ id: GenId,
ended: String,
}
impl FinishedGeneration {
+ /// Create a new finished generation.
pub fn new(id: &str, ended: &str) -> Self {
- let id = id.parse().unwrap(); // this never fails
+ let id = GenId::from_chunk_id(id.parse().unwrap()); // this never fails
Self {
id,
ended: ended.to_string(),
}
}
- pub fn id(&self) -> ChunkId {
- self.id.clone()
+ /// Get the generation's identifier.
+ pub fn id(&self) -> &GenId {
+ &self.id
}
+ /// When was generation finished?
pub fn ended(&self) -> &str {
&self.ended
}
@@ -107,9 +152,51 @@ impl FinishedGeneration {
/// This is for querying an existing generation, and other read-only
/// operations.
pub struct LocalGeneration {
- conn: Connection,
+ db: GenerationDb,
+}
+
+/// Possible errors from using local generations.
+#[derive(Debug, thiserror::Error)]
+pub enum LocalGenerationError {
+ /// Duplicate file names.
+ #[error("Generation has more than one file with the name {0}")]
+ TooManyFiles(PathBuf),
+
+ /// No 'meta' table in generation.
+ #[error("Generation does not have a 'meta' table")]
+ NoMeta,
+
+ /// Local generation uses a schema version that this version of
+ /// Obnam isn't compatible with.
+ #[error("Backup is not compatible with this version of Obnam: {0}.{1}")]
+ Incompatible(VersionComponent, VersionComponent),
+
+ /// Error from generation metadata.
+ #[error(transparent)]
+ GenerationMeta(#[from] GenerationMetaError),
+
+ /// Error from SQL.
+ #[error(transparent)]
+ RusqliteError(#[from] rusqlite::Error),
+
+ /// Error from a GenerationDb.
+ #[error(transparent)]
+ GenerationDb(#[from] GenerationDbError),
+
+ /// Error from a Database.
+ #[error(transparent)]
+ Database(#[from] DatabaseError),
+
+ /// Error from JSON.
+ #[error(transparent)]
+ SerdeJsonError(#[from] serde_json::Error),
+
+ /// Error from I/O.
+ #[error(transparent)]
+ IoError(#[from] std::io::Error),
}
+/// A backed up file in a local generation.
pub struct BackedUpFile {
fileno: FileId,
entry: FilesystemEntry,
@@ -117,8 +204,8 @@ pub struct BackedUpFile {
}
impl BackedUpFile {
- pub fn new(fileno: FileId, entry: FilesystemEntry, reason: &str) -> Self {
- let reason = Reason::from_str(reason);
+ /// Create a new `BackedUpFile`.
+ pub fn new(fileno: FileId, entry: FilesystemEntry, reason: Reason) -> Self {
Self {
fileno,
entry,
@@ -126,179 +213,204 @@ impl BackedUpFile {
}
}
+ /// Return id for file in its local generation.
pub fn fileno(&self) -> FileId {
self.fileno
}
+ /// Return file system entry for file.
pub fn entry(&self) -> &FilesystemEntry {
&self.entry
}
+ /// Return reason why file is in its local generation.
pub fn reason(&self) -> Reason {
self.reason
}
}
impl LocalGeneration {
- pub fn open<P>(filename: P) -> anyhow::Result<Self>
+ fn new(db: GenerationDb) -> Self {
+ Self { db }
+ }
+
+ /// Open a local file as a local generation.
+ pub fn open<P>(filename: P) -> Result<Self, LocalGenerationError>
where
P: AsRef<Path>,
{
- let conn = sql::open_db(filename.as_ref())?;
- Ok(Self { conn })
+ let db = GenerationDb::open(filename.as_ref())?;
+ let gen = Self::new(db);
+ Ok(gen)
}
- pub fn file_count(&self) -> anyhow::Result<i64> {
- Ok(sql::file_count(&self.conn)?)
+ /// Return generation metadata for local generation.
+ pub fn meta(&self) -> Result<GenerationMeta, LocalGenerationError> {
+ let map = self.db.meta()?;
+ GenerationMeta::from(map).map_err(LocalGenerationError::GenerationMeta)
}
- pub fn files(&self) -> anyhow::Result<Vec<BackedUpFile>> {
- Ok(sql::files(&self.conn)?)
+ /// How many files are there in the local generation?
+ pub fn file_count(&self) -> Result<FileId, LocalGenerationError> {
+ Ok(self.db.file_count()?)
}
- pub fn chunkids(&self, fileno: FileId) -> anyhow::Result<Vec<ChunkId>> {
- Ok(sql::chunkids(&self.conn, fileno)?)
+ /// Return all files in the local generation.
+ pub fn files(
+ &self,
+ ) -> Result<SqlResults<(FileId, FilesystemEntry, Reason, bool)>, LocalGenerationError> {
+ self.db.files().map_err(LocalGenerationError::GenerationDb)
}
- pub fn get_file(&self, filename: &Path) -> anyhow::Result<Option<FilesystemEntry>> {
- Ok(sql::get_file(&self.conn, filename)?)
+ /// Return ids for all chunks in local generation.
+ pub fn chunkids(&self, fileid: FileId) -> Result<SqlResults<ChunkId>, LocalGenerationError> {
+ self.db
+ .chunkids(fileid)
+ .map_err(LocalGenerationError::GenerationDb)
}
- pub fn get_fileno(&self, filename: &Path) -> anyhow::Result<Option<FileId>> {
- Ok(sql::get_fileno(&self.conn, filename)?)
+ /// Return entry for a file, given its pathname.
+ pub fn get_file(
+ &self,
+ filename: &Path,
+ ) -> Result<Option<FilesystemEntry>, LocalGenerationError> {
+ self.db
+ .get_file(filename)
+ .map_err(LocalGenerationError::GenerationDb)
}
-}
-mod sql {
- use super::BackedUpFile;
- use super::FileId;
- use crate::backup_reason::Reason;
- use crate::chunkid::ChunkId;
- use crate::error::ObnamError;
- use crate::fsentry::FilesystemEntry;
- use rusqlite::{params, Connection, OpenFlags, Row, Transaction};
- use std::os::unix::ffi::OsStrExt;
- use std::path::Path;
-
- pub fn create_db(filename: &Path) -> anyhow::Result<Connection> {
- let flags = OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE;
- let conn = Connection::open_with_flags(filename, flags)?;
- conn.execute(
- "CREATE TABLE files (fileno INTEGER PRIMARY KEY, filename BLOB, json TEXT, reason TEXT)",
- params![],
- )?;
- conn.execute(
- "CREATE TABLE chunks (fileno INTEGER, chunkid TEXT)",
- params![],
- )?;
- conn.execute("CREATE INDEX filenames ON files (filename)", params![])?;
- conn.execute("CREATE INDEX filenos ON chunks (fileno)", params![])?;
- conn.pragma_update(None, "journal_mode", &"WAL")?;
- Ok(conn)
+ /// Get the id in the local generation of a file, given its pathname.
+ pub fn get_fileno(&self, filename: &Path) -> Result<Option<FileId>, LocalGenerationError> {
+ self.db
+ .get_fileno(filename)
+ .map_err(LocalGenerationError::GenerationDb)
}
- pub fn open_db(filename: &Path) -> anyhow::Result<Connection> {
- let flags = OpenFlags::SQLITE_OPEN_READ_WRITE;
- let conn = Connection::open_with_flags(filename, flags)?;
- conn.pragma_update(None, "journal_mode", &"WAL")?;
- Ok(conn)
- }
-
- pub fn insert_one(
- t: &Transaction,
- e: FilesystemEntry,
- fileno: FileId,
- ids: &[ChunkId],
- reason: Reason,
- ) -> anyhow::Result<()> {
- let json = serde_json::to_string(&e)?;
- t.execute(
- "INSERT INTO files (fileno, filename, json, reason) VALUES (?1, ?2, ?3, ?4)",
- params![fileno, path_into_blob(&e.pathbuf()), &json, reason,],
- )?;
- for id in ids {
- t.execute(
- "INSERT INTO chunks (fileno, chunkid) VALUES (?1, ?2)",
- params![fileno, id],
- )?;
- }
- Ok(())
+ /// Does a pathname refer to a cache directory?
+ pub fn is_cachedir_tag(&self, filename: &Path) -> Result<bool, LocalGenerationError> {
+ self.db
+ .is_cachedir_tag(filename)
+ .map_err(LocalGenerationError::GenerationDb)
}
+}
- fn path_into_blob(path: &Path) -> Vec<u8> {
- path.as_os_str().as_bytes().to_vec()
- }
+#[cfg(test)]
+mod test {
+ use super::{LabelChecksumKind, LocalGeneration, NascentGeneration, Reason, SchemaVersion};
+ use crate::fsentry::EntryBuilder;
+ use crate::fsentry::FilesystemKind;
+ use std::path::PathBuf;
+ use tempfile::{tempdir, NamedTempFile};
- pub fn row_to_entry(row: &Row) -> rusqlite::Result<(FileId, String, String)> {
- let fileno: FileId = row.get(row.column_index("fileno")?)?;
- let json: String = row.get(row.column_index("json")?)?;
- let reason: String = row.get(row.column_index("reason")?)?;
- Ok((fileno, json, reason))
- }
+ #[test]
+ fn round_trips_u64_max() {
+ let tmp = tempdir().unwrap();
+ let filename = tmp.path().join("test.db");
+ let path = PathBuf::from("/");
+ let schema = SchemaVersion::new(0, 0);
+ {
+ let e = EntryBuilder::new(FilesystemKind::Directory)
+ .path(path.clone())
+ .len(u64::MAX)
+ .build();
+ let mut gen =
+ NascentGeneration::create(&filename, schema, LabelChecksumKind::Sha256).unwrap();
+ gen.insert(e, &[], Reason::IsNew, false).unwrap();
+ gen.close().unwrap();
+ }
- pub fn file_count(conn: &Connection) -> anyhow::Result<FileId> {
- let mut stmt = conn.prepare("SELECT count(*) FROM files")?;
- let mut iter = stmt.query_map(params![], |row| row.get(0))?;
- let count = iter.next().expect("SQL count result (1)");
- let count = count?;
- Ok(count)
+ let db = LocalGeneration::open(&filename).unwrap();
+ let e = db.get_file(&path).unwrap().unwrap();
+ assert_eq!(e.len(), u64::MAX);
}
- pub fn files(conn: &Connection) -> anyhow::Result<Vec<BackedUpFile>> {
- let mut stmt = conn.prepare("SELECT * FROM files")?;
- let iter = stmt.query_map(params![], |row| row_to_entry(row))?;
- let mut files = vec![];
- for x in iter {
- let (fileno, json, reason) = x?;
- let entry = serde_json::from_str(&json)?;
- files.push(BackedUpFile::new(fileno, entry, &reason));
+ #[test]
+ fn empty() {
+ let filename = NamedTempFile::new().unwrap().path().to_path_buf();
+ let schema = SchemaVersion::new(0, 0);
+ {
+ let mut _gen =
+ NascentGeneration::create(&filename, schema, LabelChecksumKind::Sha256).unwrap();
+ // _gen is dropped here; the connection is close; the file
+ // should not be removed.
}
- Ok(files)
+ assert!(filename.exists());
}
- pub fn chunkids(conn: &Connection, fileno: FileId) -> anyhow::Result<Vec<ChunkId>> {
- let mut stmt = conn.prepare("SELECT chunkid FROM chunks WHERE fileno = ?1")?;
- let iter = stmt.query_map(params![fileno], |row| Ok(row.get(0)?))?;
- let mut ids: Vec<ChunkId> = vec![];
- for x in iter {
- let fileno: String = x?;
- ids.push(ChunkId::from(&fileno));
+ // FIXME: This is way too complicated a test function. It should
+ // be simplified, possibly by re-thinking the abstractions of the
+ // code it calls.
+ #[test]
+ fn remembers_cachedir_tags() {
+ use crate::{
+ backup_reason::Reason, backup_run::FsEntryBackupOutcome, fsentry::FilesystemEntry,
+ };
+ use std::{fs::metadata, path::Path};
+
+ // Create a `Metadata` structure to pass to other functions (we don't care about the
+ // contents)
+ let src_file = NamedTempFile::new().unwrap();
+ let metadata = metadata(src_file.path()).unwrap();
+
+ let dbfile = NamedTempFile::new().unwrap().path().to_path_buf();
+
+ let nontag_path1 = Path::new("/nontag1");
+ let nontag_path2 = Path::new("/dir/nontag2");
+ let tag_path1 = Path::new("/a_tag");
+ let tag_path2 = Path::new("/another_dir/a_tag");
+
+ let schema = SchemaVersion::new(0, 0);
+ let mut gen =
+ NascentGeneration::create(&dbfile, schema, LabelChecksumKind::Sha256).unwrap();
+ let mut cache = users::UsersCache::new();
+
+ gen.insert(
+ FilesystemEntry::from_metadata(nontag_path1, &metadata, &mut cache).unwrap(),
+ &[],
+ Reason::IsNew,
+ false,
+ )
+ .unwrap();
+ gen.insert(
+ FilesystemEntry::from_metadata(tag_path1, &metadata, &mut cache).unwrap(),
+ &[],
+ Reason::IsNew,
+ true,
+ )
+ .unwrap();
+
+ let entries = vec![
+ FsEntryBackupOutcome {
+ entry: FilesystemEntry::from_metadata(nontag_path2, &metadata, &mut cache).unwrap(),
+ ids: vec![],
+ reason: Reason::IsNew,
+ is_cachedir_tag: false,
+ },
+ FsEntryBackupOutcome {
+ entry: FilesystemEntry::from_metadata(tag_path2, &metadata, &mut cache).unwrap(),
+ ids: vec![],
+ reason: Reason::IsNew,
+ is_cachedir_tag: true,
+ },
+ ];
+
+ for o in entries {
+ gen.insert(o.entry, &o.ids, o.reason, o.is_cachedir_tag)
+ .unwrap();
}
- Ok(ids)
- }
- pub fn get_file(conn: &Connection, filename: &Path) -> anyhow::Result<Option<FilesystemEntry>> {
- match get_file_and_fileno(conn, filename)? {
- None => Ok(None),
- Some((_, e, _)) => Ok(Some(e)),
- }
- }
+ gen.close().unwrap();
- pub fn get_fileno(conn: &Connection, filename: &Path) -> anyhow::Result<Option<FileId>> {
- match get_file_and_fileno(conn, filename)? {
- None => Ok(None),
- Some((id, _, _)) => Ok(Some(id)),
- }
- }
+ let gen = LocalGeneration::open(dbfile).unwrap();
+ assert!(!gen.is_cachedir_tag(nontag_path1).unwrap());
+ assert!(!gen.is_cachedir_tag(nontag_path2).unwrap());
+ assert!(gen.is_cachedir_tag(tag_path1).unwrap());
+ assert!(gen.is_cachedir_tag(tag_path2).unwrap());
- fn get_file_and_fileno(
- conn: &Connection,
- filename: &Path,
- ) -> anyhow::Result<Option<(FileId, FilesystemEntry, String)>> {
- let mut stmt = conn.prepare("SELECT * FROM files WHERE filename = ?1")?;
- let mut iter =
- stmt.query_map(params![path_into_blob(filename)], |row| row_to_entry(row))?;
- match iter.next() {
- None => Ok(None),
- Some(Err(e)) => Err(e.into()),
- Some(Ok((fileno, json, reason))) => {
- let entry = serde_json::from_str(&json)?;
- if iter.next() == None {
- Ok(Some((fileno, entry, reason)))
- } else {
- Err(ObnamError::TooManyFiles(filename.to_path_buf()).into())
- }
- }
- }
+ // Nonexistent files are not cachedir tags
+ assert!(!gen.is_cachedir_tag(Path::new("/hello/world")).unwrap());
+ assert!(!gen
+ .is_cachedir_tag(Path::new("/different path/to/another file.txt"))
+ .unwrap());
}
}
diff --git a/src/genlist.rs b/src/genlist.rs
index 10c614e..3a0d81a 100644
--- a/src/genlist.rs
+++ b/src/genlist.rs
@@ -1,33 +1,51 @@
+//! A list of generations on the server.
+
use crate::chunkid::ChunkId;
-use crate::generation::FinishedGeneration;
+use crate::generation::{FinishedGeneration, GenId};
+/// A list of generations on the server.
pub struct GenerationList {
list: Vec<FinishedGeneration>,
}
+/// Possible errors from listing generations.
+#[derive(Debug, thiserror::Error)]
+pub enum GenerationListError {
+ /// Server doesn't know about a generation.
+ #[error("Unknown generation: {0}")]
+ UnknownGeneration(ChunkId),
+}
+
impl GenerationList {
+ /// Create a new list of generations.
pub fn new(gens: Vec<FinishedGeneration>) -> Self {
- let mut list = gens.clone();
+ let mut list = gens;
list.sort_by_cached_key(|gen| gen.ended().to_string());
Self { list }
}
+ /// Return an iterator over the generations.
pub fn iter(&self) -> impl Iterator<Item = &FinishedGeneration> {
self.list.iter()
}
- pub fn resolve(&self, genref: &str) -> Option<String> {
+ /// Resolve a symbolic name of a generation into its identifier.
+ ///
+ /// For example, "latest" refers to the latest backup, but needs
+ /// to be resolved into an actual, immutable id to actually be
+ /// restored.
+ pub fn resolve(&self, genref: &str) -> Result<GenId, GenerationListError> {
let gen = if self.list.is_empty() {
None
} else if genref == "latest" {
let i = self.list.len() - 1;
Some(self.list[i].clone())
} else {
- let genref: ChunkId = genref.parse().unwrap();
+ let genref = GenId::from_chunk_id(genref.parse().unwrap());
let hits: Vec<FinishedGeneration> = self
.iter()
- .filter(|gen| gen.id() == genref)
- .map(|gen| gen.clone())
+ .filter(|gen| gen.id().as_chunk_id() == genref.as_chunk_id())
+ .cloned()
.collect();
if hits.len() == 1 {
Some(hits[0].clone())
@@ -36,8 +54,10 @@ impl GenerationList {
}
};
match gen {
- None => None,
- Some(gen) => Some(gen.id().to_string()),
+ None => Err(GenerationListError::UnknownGeneration(ChunkId::recreate(
+ genref,
+ ))),
+ Some(gen) => Ok(gen.id().clone()),
}
}
}
diff --git a/src/genmeta.rs b/src/genmeta.rs
new file mode 100644
index 0000000..d5b14a3
--- /dev/null
+++ b/src/genmeta.rs
@@ -0,0 +1,62 @@
+//! Backup generations metadata.
+
+use crate::schema::{SchemaVersion, VersionComponent};
+use serde::Serialize;
+use std::collections::HashMap;
+
+/// Metadata about the local generation.
+#[derive(Debug, Serialize)]
+pub struct GenerationMeta {
+ schema_version: SchemaVersion,
+ extras: HashMap<String, String>,
+}
+
+impl GenerationMeta {
+ /// Create from a hash map.
+ pub fn from(mut map: HashMap<String, String>) -> Result<Self, GenerationMetaError> {
+ let major: VersionComponent = metaint(&mut map, "schema_version_major")?;
+ let minor: VersionComponent = metaint(&mut map, "schema_version_minor")?;
+ Ok(Self {
+ schema_version: SchemaVersion::new(major, minor),
+ extras: map,
+ })
+ }
+
+ /// Return schema version of local generation.
+ pub fn schema_version(&self) -> SchemaVersion {
+ self.schema_version
+ }
+
+ /// Get a value corresponding to a key in the meta table.
+ pub fn get(&self, key: &str) -> Option<&String> {
+ self.extras.get(key)
+ }
+}
+
+fn metastr(map: &mut HashMap<String, String>, key: &str) -> Result<String, GenerationMetaError> {
+ if let Some(v) = map.remove(key) {
+ Ok(v)
+ } else {
+ Err(GenerationMetaError::NoMetaKey(key.to_string()))
+ }
+}
+
+fn metaint(map: &mut HashMap<String, String>, key: &str) -> Result<u32, GenerationMetaError> {
+ let v = metastr(map, key)?;
+ let v = v
+ .parse()
+ .map_err(|err| GenerationMetaError::BadMetaInteger(key.to_string(), err))?;
+ Ok(v)
+}
+
+/// Possible errors from getting generation metadata.
+#[derive(Debug, thiserror::Error)]
+pub enum GenerationMetaError {
+ /// Missing from from 'meta' table.
+ #[error("Generation 'meta' table does not have a row {0}")]
+ NoMetaKey(String),
+
+ /// Bad data in 'meta' table.
+ #[error("Generation 'meta' row {0} has badly formed integer: {1}")]
+ BadMetaInteger(String, std::num::ParseIntError),
+}
diff --git a/src/index.rs b/src/index.rs
index d527839..42f1a95 100644
--- a/src/index.rs
+++ b/src/index.rs
@@ -1,65 +1,81 @@
+//! An on-disk index of chunks for the server.
+
use crate::chunkid::ChunkId;
use crate::chunkmeta::ChunkMeta;
+use crate::label::Label;
use rusqlite::Connection;
-use std::collections::HashMap;
-use std::path::{Path, PathBuf};
+use std::path::Path;
-/// A chunk index.
+/// A chunk index stored on the disk.
///
/// A chunk index lets the server quickly find chunks based on a
-/// string key/value pair, or whether they are generations.
+/// string key/value pair.
#[derive(Debug)]
pub struct Index {
- filename: PathBuf,
conn: Connection,
- map: HashMap<(String, String), Vec<ChunkId>>,
- generations: Vec<ChunkId>,
- metas: HashMap<ChunkId, ChunkMeta>,
+}
+
+/// All the errors that may be returned for `Index`.
+#[derive(Debug, thiserror::Error)]
+pub enum IndexError {
+ /// Index does not have a chunk.
+ #[error("The repository index does not have chunk {0}")]
+ MissingChunk(ChunkId),
+
+ /// Index has chunk more than once.
+ #[error("The repository index duplicates chunk {0}")]
+ DuplicateChunk(ChunkId),
+
+ /// An error from SQLite.
+ #[error(transparent)]
+ SqlError(#[from] rusqlite::Error),
}
impl Index {
- pub fn new<P: AsRef<Path>>(dirname: P) -> anyhow::Result<Self> {
+ /// Create a new index.
+ pub fn new<P: AsRef<Path>>(dirname: P) -> Result<Self, IndexError> {
let filename = dirname.as_ref().join("meta.db");
let conn = if filename.exists() {
sql::open_db(&filename)?
} else {
sql::create_db(&filename)?
};
- Ok(Self {
- filename,
- conn,
- map: HashMap::new(),
- generations: vec![],
- metas: HashMap::new(),
- })
+ Ok(Self { conn })
}
- pub fn insert_meta(&mut self, id: ChunkId, meta: ChunkMeta) -> anyhow::Result<()> {
+ /// Insert metadata for a new chunk into index.
+ pub fn insert_meta(&mut self, id: ChunkId, meta: ChunkMeta) -> Result<(), IndexError> {
let t = self.conn.transaction()?;
sql::insert(&t, &id, &meta)?;
t.commit()?;
Ok(())
}
- pub fn get_meta(&self, id: &ChunkId) -> anyhow::Result<ChunkMeta> {
+ /// Look up metadata for a chunk, given its id.
+ pub fn get_meta(&self, id: &ChunkId) -> Result<ChunkMeta, IndexError> {
sql::lookup(&self.conn, id)
}
- pub fn remove_meta(&mut self, id: &ChunkId) -> anyhow::Result<()> {
+ /// Remove a chunk's metadata.
+ pub fn remove_meta(&mut self, id: &ChunkId) -> Result<(), IndexError> {
sql::remove(&self.conn, id)
}
- pub fn find_by_sha256(&self, sha256: &str) -> anyhow::Result<Vec<ChunkId>> {
- sql::find_by_256(&self.conn, sha256)
+ /// Find chunks with a client-assigned label.
+ pub fn find_by_label(&self, label: &str) -> Result<Vec<ChunkId>, IndexError> {
+ sql::find_by_label(&self.conn, label)
}
- pub fn find_generations(&self) -> anyhow::Result<Vec<ChunkId>> {
- sql::find_generations(&self.conn)
+ /// Find all chunks.
+ pub fn all_chunks(&self) -> Result<Vec<ChunkId>, IndexError> {
+ sql::find_chunk_ids(&self.conn)
}
}
#[cfg(test)]
mod test {
+ use super::Label;
+
use super::{ChunkId, ChunkMeta, Index};
use std::path::Path;
use tempfile::tempdir;
@@ -71,140 +87,113 @@ mod test {
#[test]
fn remembers_inserted() {
let id: ChunkId = "id001".parse().unwrap();
- let meta = ChunkMeta::new("abc");
+ let sum = Label::sha256(b"abc");
+ let meta = ChunkMeta::new(&sum);
let dir = tempdir().unwrap();
let mut idx = new_index(dir.path());
idx.insert_meta(id.clone(), meta.clone()).unwrap();
assert_eq!(idx.get_meta(&id).unwrap(), meta);
- let ids = idx.find_by_sha256("abc").unwrap();
+ let ids = idx.find_by_label(&sum.serialize()).unwrap();
assert_eq!(ids, vec![id]);
}
#[test]
fn does_not_find_uninserted() {
let id: ChunkId = "id001".parse().unwrap();
- let meta = ChunkMeta::new("abc");
+ let sum = Label::sha256(b"abc");
+ let meta = ChunkMeta::new(&sum);
let dir = tempdir().unwrap();
let mut idx = new_index(dir.path());
idx.insert_meta(id, meta).unwrap();
- assert_eq!(idx.find_by_sha256("def").unwrap().len(), 0)
+ assert_eq!(idx.find_by_label("def").unwrap().len(), 0)
}
#[test]
fn removes_inserted() {
let id: ChunkId = "id001".parse().unwrap();
- let meta = ChunkMeta::new("abc");
+ let sum = Label::sha256(b"abc");
+ let meta = ChunkMeta::new(&sum);
let dir = tempdir().unwrap();
let mut idx = new_index(dir.path());
idx.insert_meta(id.clone(), meta).unwrap();
idx.remove_meta(&id).unwrap();
- let ids: Vec<ChunkId> = idx.find_by_sha256("abc").unwrap();
+ let ids: Vec<ChunkId> = idx.find_by_label(&sum.serialize()).unwrap();
assert_eq!(ids, vec![]);
}
-
- #[test]
- fn has_no_generations_initially() {
- let dir = tempdir().unwrap();
- let idx = new_index(dir.path());
- assert_eq!(idx.find_generations().unwrap(), vec![]);
- }
-
- #[test]
- fn remembers_generation() {
- let id: ChunkId = "id001".parse().unwrap();
- let meta = ChunkMeta::new_generation("abc", "timestamp");
- let dir = tempdir().unwrap();
- let mut idx = new_index(dir.path());
- idx.insert_meta(id.clone(), meta.clone()).unwrap();
- assert_eq!(idx.find_generations().unwrap(), vec![id]);
- }
-
- #[test]
- fn removes_generation() {
- let id: ChunkId = "id001".parse().unwrap();
- let meta = ChunkMeta::new_generation("abc", "timestamp");
- let dir = tempdir().unwrap();
- let mut idx = new_index(dir.path());
- idx.insert_meta(id.clone(), meta.clone()).unwrap();
- idx.remove_meta(&id).unwrap();
- assert_eq!(idx.find_generations().unwrap(), vec![]);
- }
}
mod sql {
+ use super::{IndexError, Label};
use crate::chunkid::ChunkId;
use crate::chunkmeta::ChunkMeta;
- use crate::error::ObnamError;
use log::error;
use rusqlite::{params, Connection, OpenFlags, Row, Transaction};
use std::path::Path;
- pub fn create_db(filename: &Path) -> anyhow::Result<Connection> {
+ /// Create a database in a file.
+ pub fn create_db(filename: &Path) -> Result<Connection, IndexError> {
let flags = OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE;
let conn = Connection::open_with_flags(filename, flags)?;
conn.execute(
- "CREATE TABLE chunks (id TEXT PRIMARY KEY, sha256 TEXT, generation INT, ended TEXT)",
+ "CREATE TABLE chunks (id TEXT PRIMARY KEY, label TEXT)",
params![],
)?;
- conn.execute("CREATE INDEX sha256_idx ON chunks (sha256)", params![])?;
- conn.execute(
- "CREATE INDEX generation_idx ON chunks (generation)",
- params![],
- )?;
- conn.pragma_update(None, "journal_mode", &"WAL")?;
+ conn.execute("CREATE INDEX label_idx ON chunks (label)", params![])?;
+ conn.pragma_update(None, "journal_mode", "WAL")?;
Ok(conn)
}
- pub fn open_db(filename: &Path) -> anyhow::Result<Connection> {
+ /// Open an existing database in a file.
+ pub fn open_db(filename: &Path) -> Result<Connection, IndexError> {
let flags = OpenFlags::SQLITE_OPEN_READ_WRITE;
let conn = Connection::open_with_flags(filename, flags)?;
- conn.pragma_update(None, "journal_mode", &"WAL")?;
+ conn.pragma_update(None, "journal_mode", "WAL")?;
Ok(conn)
}
- pub fn insert(t: &Transaction, chunkid: &ChunkId, meta: &ChunkMeta) -> anyhow::Result<()> {
+ /// Insert a new chunk's metadata into database.
+ pub fn insert(t: &Transaction, chunkid: &ChunkId, meta: &ChunkMeta) -> Result<(), IndexError> {
let chunkid = format!("{}", chunkid);
- let sha256 = meta.sha256();
- let generation = if meta.is_generation() { 1 } else { 0 };
- let ended = meta.ended();
+ let label = meta.label();
t.execute(
- "INSERT INTO chunks (id, sha256, generation, ended) VALUES (?1, ?2, ?3, ?4)",
- params![chunkid, sha256, generation, ended],
+ "INSERT INTO chunks (id, label) VALUES (?1, ?2)",
+ params![chunkid, label],
)?;
Ok(())
}
- pub fn remove(conn: &Connection, chunkid: &ChunkId) -> anyhow::Result<()> {
+ /// Remove a chunk's metadata from the database.
+ pub fn remove(conn: &Connection, chunkid: &ChunkId) -> Result<(), IndexError> {
conn.execute("DELETE FROM chunks WHERE id IS ?1", params![chunkid])?;
Ok(())
}
- pub fn lookup(conn: &Connection, id: &ChunkId) -> anyhow::Result<ChunkMeta> {
+ /// Look up a chunk using its id.
+ pub fn lookup(conn: &Connection, id: &ChunkId) -> Result<ChunkMeta, IndexError> {
let mut stmt = conn.prepare("SELECT * FROM chunks WHERE id IS ?1")?;
- let iter = stmt.query_map(params![id], |row| row_to_meta(row))?;
+ let iter = stmt.query_map(params![id], row_to_meta)?;
let mut metas: Vec<ChunkMeta> = vec![];
for meta in iter {
let meta = meta?;
if metas.is_empty() {
- eprintln!("lookup: meta={:?}", meta);
metas.push(meta);
} else {
- let err = ObnamError::DuplicateChunk(id.clone());
+ let err = IndexError::DuplicateChunk(id.clone());
error!("{}", err);
- return Err(err.into());
+ return Err(err);
}
}
- if metas.len() == 0 {
- eprintln!("lookup: no hits");
- return Err(ObnamError::MissingChunk(id.clone()).into());
+ if metas.is_empty() {
+ return Err(IndexError::MissingChunk(id.clone()));
}
let r = metas[0].clone();
Ok(r)
}
- pub fn find_by_256(conn: &Connection, sha256: &str) -> anyhow::Result<Vec<ChunkId>> {
- let mut stmt = conn.prepare("SELECT id FROM chunks WHERE sha256 IS ?1")?;
- let iter = stmt.query_map(params![sha256], |row| row_to_id(row))?;
+ /// Find chunks with a given checksum.
+ pub fn find_by_label(conn: &Connection, label: &str) -> Result<Vec<ChunkId>, IndexError> {
+ let mut stmt = conn.prepare("SELECT id FROM chunks WHERE label IS ?1")?;
+ let iter = stmt.query_map(params![label], row_to_id)?;
let mut ids = vec![];
for x in iter {
let x = x?;
@@ -213,9 +202,10 @@ mod sql {
Ok(ids)
}
- pub fn find_generations(conn: &Connection) -> anyhow::Result<Vec<ChunkId>> {
- let mut stmt = conn.prepare("SELECT id FROM chunks WHERE generation IS 1")?;
- let iter = stmt.query_map(params![], |row| row_to_id(row))?;
+ /// Find ids of all chunks.
+ pub fn find_chunk_ids(conn: &Connection) -> Result<Vec<ChunkId>, IndexError> {
+ let mut stmt = conn.prepare("SELECT id FROM chunks")?;
+ let iter = stmt.query_map(params![], row_to_id)?;
let mut ids = vec![];
for x in iter {
let x = x?;
@@ -224,20 +214,14 @@ mod sql {
Ok(ids)
}
- pub fn row_to_meta(row: &Row) -> rusqlite::Result<ChunkMeta> {
- let sha256: String = row.get(row.column_index("sha256")?)?;
- let generation: i32 = row.get(row.column_index("generation")?)?;
- let meta = if generation == 0 {
- ChunkMeta::new(&sha256)
- } else {
- let ended: String = row.get(row.column_index("ended")?)?;
- ChunkMeta::new_generation(&sha256, &ended)
- };
- Ok(meta)
+ fn row_to_meta(row: &Row) -> rusqlite::Result<ChunkMeta> {
+ let hash: String = row.get("label")?;
+ let sha256 = Label::deserialize(&hash).expect("deserialize checksum from database");
+ Ok(ChunkMeta::new(&sha256))
}
- pub fn row_to_id(row: &Row) -> rusqlite::Result<ChunkId> {
- let id: String = row.get(row.column_index("id")?)?;
- Ok(ChunkId::from_str(&id))
+ fn row_to_id(row: &Row) -> rusqlite::Result<ChunkId> {
+ let id: String = row.get("id")?;
+ Ok(ChunkId::recreate(&id))
}
}
diff --git a/src/indexedstore.rs b/src/indexedstore.rs
deleted file mode 100644
index 0366013..0000000
--- a/src/indexedstore.rs
+++ /dev/null
@@ -1,57 +0,0 @@
-use crate::chunk::DataChunk;
-use crate::chunkid::ChunkId;
-use crate::chunkmeta::ChunkMeta;
-use crate::index::Index;
-use crate::store::Store;
-use std::path::Path;
-
-/// A store for chunks and their metadata.
-///
-/// This combines Store and Index into one interface to make it easier
-/// to handle the server side storage of chunks.
-pub struct IndexedStore {
- store: Store,
- index: Index,
-}
-
-impl IndexedStore {
- pub fn new(dirname: &Path) -> anyhow::Result<Self> {
- let store = Store::new(dirname);
- let index = Index::new(dirname)?;
- Ok(Self { store, index })
- }
-
- pub fn save(&mut self, meta: &ChunkMeta, chunk: &DataChunk) -> anyhow::Result<ChunkId> {
- let id = ChunkId::new();
- self.store.save(&id, meta, chunk)?;
- self.insert_meta(&id, meta)?;
- Ok(id)
- }
-
- fn insert_meta(&mut self, id: &ChunkId, meta: &ChunkMeta) -> anyhow::Result<()> {
- self.index.insert_meta(id.clone(), meta.clone())?;
- Ok(())
- }
-
- pub fn load(&self, id: &ChunkId) -> anyhow::Result<(DataChunk, ChunkMeta)> {
- Ok((self.store.load(id)?, self.load_meta(id)?))
- }
-
- pub fn load_meta(&self, id: &ChunkId) -> anyhow::Result<ChunkMeta> {
- self.index.get_meta(id)
- }
-
- pub fn find_by_sha256(&self, sha256: &str) -> anyhow::Result<Vec<ChunkId>> {
- self.index.find_by_sha256(sha256)
- }
-
- pub fn find_generations(&self) -> anyhow::Result<Vec<ChunkId>> {
- self.index.find_generations()
- }
-
- pub fn remove(&mut self, id: &ChunkId) -> anyhow::Result<()> {
- self.index.remove_meta(id).unwrap();
- self.store.delete(id)?;
- Ok(())
- }
-}
diff --git a/src/label.rs b/src/label.rs
new file mode 100644
index 0000000..19d270a
--- /dev/null
+++ b/src/label.rs
@@ -0,0 +1,138 @@
+//! A chunk label.
+//!
+//! De-duplication of backed up data in Obnam relies on cryptographic
+//! checksums. They are implemented in this module. Note that Obnam
+//! does not aim to make these algorithms configurable, so only a very
+//! small number of carefully chosen algorithms are supported here.
+
+use blake2::Blake2s256;
+use sha2::{Digest, Sha256};
+
+const LITERAL: char = '0';
+const SHA256: char = '1';
+const BLAKE2: char = '2';
+
+/// A checksum of some data.
+#[derive(Debug, Clone)]
+pub enum Label {
+ /// An arbitrary, literal string.
+ Literal(String),
+
+ /// A SHA256 checksum.
+ Sha256(String),
+
+ /// A BLAKE2s checksum.
+ Blake2(String),
+}
+
+impl Label {
+ /// Construct a literal string.
+ pub fn literal(s: &str) -> Self {
+ Self::Literal(s.to_string())
+ }
+
+ /// Compute a SHA256 checksum for a block of data.
+ pub fn sha256(data: &[u8]) -> Self {
+ let mut hasher = Sha256::new();
+ hasher.update(data);
+ let hash = hasher.finalize();
+ Self::Sha256(format!("{:x}", hash))
+ }
+
+ /// Compute a BLAKE2s checksum for a block of data.
+ pub fn blake2(data: &[u8]) -> Self {
+ let mut hasher = Blake2s256::new();
+ hasher.update(data);
+ let hash = hasher.finalize();
+ Self::Sha256(format!("{:x}", hash))
+ }
+
+ /// Serialize a label into a string representation.
+ pub fn serialize(&self) -> String {
+ match self {
+ Self::Literal(s) => format!("{}{}", LITERAL, s),
+ Self::Sha256(hash) => format!("{}{}", SHA256, hash),
+ Self::Blake2(hash) => format!("{}{}", BLAKE2, hash),
+ }
+ }
+
+ /// De-serialize a label from its string representation.
+ pub fn deserialize(s: &str) -> Result<Self, LabelError> {
+ if s.starts_with(LITERAL) {
+ Ok(Self::Literal(s[1..].to_string()))
+ } else if s.starts_with(SHA256) {
+ Ok(Self::Sha256(s[1..].to_string()))
+ } else {
+ Err(LabelError::UnknownType(s.to_string()))
+ }
+ }
+}
+
+/// Kinds of checksum labels.
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub enum LabelChecksumKind {
+ /// Use a Blake2 checksum.
+ Blake2,
+
+ /// Use a SHA256 checksum.
+ Sha256,
+}
+
+impl LabelChecksumKind {
+ /// Parse a string into a label checksum kind.
+ pub fn from(s: &str) -> Result<Self, LabelError> {
+ if s == "sha256" {
+ Ok(Self::Sha256)
+ } else if s == "blake2" {
+ Ok(Self::Blake2)
+ } else {
+ Err(LabelError::UnknownType(s.to_string()))
+ }
+ }
+
+ /// Serialize a checksum kind into a string.
+ pub fn serialize(self) -> &'static str {
+ match self {
+ Self::Sha256 => "sha256",
+ Self::Blake2 => "blake2",
+ }
+ }
+}
+
+/// Possible errors from dealing with chunk labels.
+#[derive(Debug, thiserror::Error)]
+pub enum LabelError {
+ /// Serialized label didn't start with a known type prefix.
+ #[error("Unknown label: {0:?}")]
+ UnknownType(String),
+}
+
+#[cfg(test)]
+mod test {
+ use super::{Label, LabelChecksumKind};
+
+ #[test]
+ fn roundtrip_literal() {
+ let label = Label::literal("dummy data");
+ let serialized = label.serialize();
+ let de = Label::deserialize(&serialized).unwrap();
+ let seri2 = de.serialize();
+ assert_eq!(serialized, seri2);
+ }
+
+ #[test]
+ fn roundtrip_sha256() {
+ let label = Label::sha256(b"dummy data");
+ let serialized = label.serialize();
+ let de = Label::deserialize(&serialized).unwrap();
+ let seri2 = de.serialize();
+ assert_eq!(serialized, seri2);
+ }
+
+ #[test]
+ fn roundtrip_checksum_kind() {
+ for kind in [LabelChecksumKind::Sha256, LabelChecksumKind::Blake2] {
+ assert_eq!(LabelChecksumKind::from(kind.serialize()).unwrap(), kind);
+ }
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index a12b8a3..8894966 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,21 +1,38 @@
+//! Encrypted backups.
+//!
+//! Obnam is a backup program that encrypts the backups. This crate
+//! provides access to all the functionality of Obnam as a library.
+
+#![deny(missing_docs)]
+
+pub mod accumulated_time;
pub mod backup_progress;
pub mod backup_reason;
pub mod backup_run;
-pub mod benchmark;
-pub mod checksummer;
pub mod chunk;
pub mod chunker;
pub mod chunkid;
pub mod chunkmeta;
+pub mod chunkstore;
+pub mod cipher;
pub mod client;
pub mod cmd;
+pub mod config;
+pub mod db;
+pub mod dbgen;
+pub mod engine;
pub mod error;
pub mod fsentry;
pub mod fsiter;
pub mod generation;
pub mod genlist;
+pub mod genmeta;
pub mod index;
-pub mod indexedstore;
+pub mod label;
+pub mod passwords;
+pub mod performance;
pub mod policy;
+pub mod schema;
pub mod server;
pub mod store;
+pub mod workqueue;
diff --git a/src/passwords.rs b/src/passwords.rs
new file mode 100644
index 0000000..efc3f96
--- /dev/null
+++ b/src/passwords.rs
@@ -0,0 +1,102 @@
+//! Passwords for encryption.
+
+use pbkdf2::{
+ password_hash::{PasswordHasher, SaltString},
+ Pbkdf2,
+};
+use rand::rngs::OsRng;
+use serde::{Deserialize, Serialize};
+use std::io::prelude::Write;
+use std::os::unix::fs::PermissionsExt;
+use std::path::{Path, PathBuf};
+
+const KEY_LEN: usize = 32; // Only size accepted by aead crate?
+
+/// Encryption password.
+#[derive(Debug, Serialize, Deserialize, Clone)]
+pub struct Passwords {
+ encryption: String,
+}
+
+impl Passwords {
+ /// Create a new encryption password from a user-supplied passphrase.
+ pub fn new(passphrase: &str) -> Self {
+ let mut key = derive_password(passphrase);
+ let _ = key.split_off(KEY_LEN);
+ assert_eq!(key.len(), KEY_LEN);
+ Self { encryption: key }
+ }
+
+ /// Get encryption key.
+ pub fn encryption_key(&self) -> &[u8] {
+ self.encryption.as_bytes()
+ }
+
+ /// Load passwords from file.
+ pub fn load(filename: &Path) -> Result<Self, PasswordError> {
+ let data = std::fs::read(filename)
+ .map_err(|err| PasswordError::Read(filename.to_path_buf(), err))?;
+ serde_yaml::from_slice(&data)
+ .map_err(|err| PasswordError::Parse(filename.to_path_buf(), err))
+ }
+
+ /// Save passwords to file.
+ pub fn save(&self, filename: &Path) -> Result<(), PasswordError> {
+ let data = serde_yaml::to_string(&self).map_err(PasswordError::Serialize)?;
+
+ let mut file = std::fs::File::create(filename)
+ .map_err(|err| PasswordError::Write(filename.to_path_buf(), err))?;
+ let metadata = file
+ .metadata()
+ .map_err(|err| PasswordError::Write(filename.to_path_buf(), err))?;
+ let mut permissions = metadata.permissions();
+
+ // Make readadable by owner only. We still have the open file
+ // handle, so we can write the content.
+ permissions.set_mode(0o400);
+ std::fs::set_permissions(filename, permissions)
+ .map_err(|err| PasswordError::Write(filename.to_path_buf(), err))?;
+
+ // Write actual content.
+ file.write_all(data.as_bytes())
+ .map_err(|err| PasswordError::Write(filename.to_path_buf(), err))?;
+
+ Ok(())
+ }
+}
+
+/// Return name of password file, relative to configuration file.
+pub fn passwords_filename(config_filename: &Path) -> PathBuf {
+ let mut filename = config_filename.to_path_buf();
+ filename.set_file_name("passwords.yaml");
+ filename
+}
+
+fn derive_password(passphrase: &str) -> String {
+ let salt = SaltString::generate(&mut OsRng);
+
+ Pbkdf2
+ .hash_password(passphrase.as_bytes(), salt.as_salt())
+ .unwrap()
+ .to_string()
+}
+
+/// Possible errors from passwords.
+#[derive(Debug, thiserror::Error)]
+pub enum PasswordError {
+ /// Failed to make YAML when saving passwords.
+ #[error("failed to serialize passwords for saving: {0}")]
+ Serialize(serde_yaml::Error),
+
+ /// Failed to save to file.
+ #[error("failed to save passwords to {0}: {1}")]
+ Write(PathBuf, std::io::Error),
+
+ /// Failed read passwords file.
+ #[error("failed to read passwords from {0}: {1}")]
+ Read(PathBuf, std::io::Error),
+
+ /// Failed to parse passwords file.
+ #[error("failed to parse saved passwords from {0}: {1}")]
+ Parse(PathBuf, serde_yaml::Error),
+}
diff --git a/src/performance.rs b/src/performance.rs
new file mode 100644
index 0000000..29c2328
--- /dev/null
+++ b/src/performance.rs
@@ -0,0 +1,97 @@
+//! Performance measurements from an Obnam run.
+
+use crate::accumulated_time::AccumulatedTime;
+use log::info;
+
+/// The kinds of clocks we have.
+#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
+pub enum Clock {
+ /// The complete runtime of the program.
+ RunTime,
+
+ /// Time spent downloading previous backup generations.
+ GenerationDownload,
+
+ /// Time spent uploading backup generations.
+ GenerationUpload,
+}
+
+/// Collected measurements from this Obnam run.
+#[derive(Debug)]
+pub struct Performance {
+ args: Vec<String>,
+ time: AccumulatedTime<Clock>,
+ live_files: u64,
+ files_backed_up: u64,
+ chunks_uploaded: u64,
+ chunks_reused: u64,
+}
+
+impl Default for Performance {
+ fn default() -> Self {
+ Self {
+ args: std::env::args().collect(),
+ time: AccumulatedTime::<Clock>::new(),
+ live_files: 0,
+ files_backed_up: 0,
+ chunks_reused: 0,
+ chunks_uploaded: 0,
+ }
+ }
+}
+
+impl Performance {
+ /// Log all performance measurements to the log file.
+ pub fn log(&self) {
+ info!("Performance measurements for this Obnam run");
+ for (i, arg) in self.args.iter().enumerate() {
+ info!("argv[{}]={:?}", i, arg);
+ }
+ info!("Live files found: {}", self.live_files);
+ info!("Files backed up: {}", self.files_backed_up);
+ info!("Chunks uploaded: {}", self.chunks_uploaded);
+ info!("Chunks reused: {}", self.chunks_reused);
+ info!(
+ "Downloading previous generation (seconds): {}",
+ self.time.secs(Clock::GenerationDownload)
+ );
+ info!(
+ "Uploading new generation (seconds): {}",
+ self.time.secs(Clock::GenerationUpload)
+ );
+ info!(
+ "Complete run time (seconds): {}",
+ self.time.secs(Clock::RunTime)
+ );
+ }
+
+ /// Start a specific clock.
+ pub fn start(&mut self, clock: Clock) {
+ self.time.start(clock)
+ }
+
+ /// Stop a specific clock.
+ pub fn stop(&mut self, clock: Clock) {
+ self.time.stop(clock)
+ }
+
+ /// Increment number of live files.
+ pub fn found_live_files(&mut self, n: u64) {
+ self.live_files += n;
+ }
+
+ /// Increment number of files backed up this run.
+ pub fn back_up_file(&mut self) {
+ self.files_backed_up += 1;
+ }
+
+ /// Increment number of reused chunks.
+ pub fn reuse_chunk(&mut self) {
+ self.chunks_reused += 1;
+ }
+
+ /// Increment number of uploaded chunks.
+ pub fn upload_chunk(&mut self) {
+ self.chunks_uploaded += 1;
+ }
+}
diff --git a/src/policy.rs b/src/policy.rs
index 032b851..8cdbd76 100644
--- a/src/policy.rs
+++ b/src/policy.rs
@@ -1,24 +1,40 @@
+//! Policy for what gets backed up.
+
use crate::backup_reason::Reason;
use crate::fsentry::FilesystemEntry;
use crate::generation::LocalGeneration;
-use log::{debug, warn};
+use log::warn;
+/// Policy for what gets backed up.
+///
+/// The policy allows two aspects to be controlled:
+///
+/// * should new files )(files that didn't exist in the previous
+/// backup be included in the new backup?
+/// * should files that haven't been changed since the previous backup
+/// be included in the new backup?
+///
+/// If policy doesn't allow a file to be included, it's skipped.
pub struct BackupPolicy {
new: bool,
old_if_changed: bool,
}
-impl BackupPolicy {
- pub fn new() -> Self {
+impl Default for BackupPolicy {
+ /// Create a default policy.
+ fn default() -> Self {
Self {
new: true,
old_if_changed: true,
}
}
+}
+impl BackupPolicy {
+ /// Does a given file need to be backed up?
pub fn needs_backup(&self, old: &LocalGeneration, new_entry: &FilesystemEntry) -> Reason {
let new_name = new_entry.pathbuf();
- let reason = match old.get_file(&new_name) {
+ match old.get_file(&new_name) {
Ok(None) => {
if self.new {
Reason::IsNew
@@ -42,14 +58,9 @@ impl BackupPolicy {
"needs_backup: lookup in old generation returned error, ignored: {:?}: {}",
new_name, err
);
- Reason::Error
+ Reason::GenerationLookupError
}
- };
- debug!(
- "needs_backup: file {:?}: policy decision: {}",
- new_name, reason
- );
- reason
+ }
}
}
diff --git a/src/schema.rs b/src/schema.rs
new file mode 100644
index 0000000..ae8c00b
--- /dev/null
+++ b/src/schema.rs
@@ -0,0 +1,173 @@
+//! Database schema versions.
+
+use serde::Serialize;
+
+/// The type of schema version components.
+pub type VersionComponent = u32;
+
+/// Schema version of the database storing the generation.
+///
+/// An Obnam client can restore a generation using schema version
+/// (x,y), if the client supports a schema version (x,z). If z < y,
+/// the client knows it may not be able to the generation faithfully,
+/// and should warn the user about this. If z >= y, the client knows
+/// it can restore the generation faithfully. If the client does not
+/// support any schema version x, it knows it can't restore the backup
+/// at all.
+#[derive(Debug, Clone, Copy, Serialize)]
+pub struct SchemaVersion {
+ /// Major version.
+ pub major: VersionComponent,
+ /// Minor version.
+ pub minor: VersionComponent,
+}
+
+impl SchemaVersion {
+ /// Create a new schema version object.
+ pub fn new(major: VersionComponent, minor: VersionComponent) -> Self {
+ Self { major, minor }
+ }
+
+ /// Return the major and minor version number component of a schema version.
+ pub fn version(&self) -> (VersionComponent, VersionComponent) {
+ (self.major, self.minor)
+ }
+
+ /// Is this schema version compatible with another schema version?
+ pub fn is_compatible_with(&self, other: &Self) -> bool {
+ self.major == other.major && self.minor >= other.minor
+ }
+}
+
+impl std::fmt::Display for SchemaVersion {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}.{}", self.major, self.minor)
+ }
+}
+
+impl std::str::FromStr for SchemaVersion {
+ type Err = SchemaVersionError;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ if let Some(pos) = s.find('.') {
+ let major = parse_int(&s[..pos])?;
+ let minor = parse_int(&s[pos + 1..])?;
+ Ok(SchemaVersion::new(major, minor))
+ } else {
+ Err(SchemaVersionError::Invalid(s.to_string()))
+ }
+ }
+}
+
+fn parse_int(s: &str) -> Result<VersionComponent, SchemaVersionError> {
+ if let Ok(i) = s.parse() {
+ Ok(i)
+ } else {
+ Err(SchemaVersionError::InvalidComponent(s.to_string()))
+ }
+}
+
+/// Possible errors from parsing schema versions.
+#[derive(Debug, thiserror::Error, PartialEq, Eq)]
+pub enum SchemaVersionError {
+ /// Failed to parse a string as a schema version.
+ #[error("Invalid schema version {0:?}")]
+ Invalid(String),
+
+ /// Failed to parse a string as a schema version component.
+ #[error("Invalid schema version component {0:?}")]
+ InvalidComponent(String),
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use std::str::FromStr;
+
+ #[test]
+ fn from_string() {
+ let v = SchemaVersion::from_str("1.2").unwrap();
+ assert_eq!(v.version(), (1, 2));
+ }
+
+ #[test]
+ fn from_string_fails_if_empty() {
+ match SchemaVersion::from_str("") {
+ Err(SchemaVersionError::Invalid(s)) => assert_eq!(s, ""),
+ _ => unreachable!(),
+ }
+ }
+
+ #[test]
+ fn from_string_fails_if_empty_major() {
+ match SchemaVersion::from_str(".2") {
+ Err(SchemaVersionError::InvalidComponent(s)) => assert_eq!(s, ""),
+ _ => unreachable!(),
+ }
+ }
+
+ #[test]
+ fn from_string_fails_if_empty_minor() {
+ match SchemaVersion::from_str("1.") {
+ Err(SchemaVersionError::InvalidComponent(s)) => assert_eq!(s, ""),
+ _ => unreachable!(),
+ }
+ }
+
+ #[test]
+ fn from_string_fails_if_just_major() {
+ match SchemaVersion::from_str("1") {
+ Err(SchemaVersionError::Invalid(s)) => assert_eq!(s, "1"),
+ _ => unreachable!(),
+ }
+ }
+
+ #[test]
+ fn from_string_fails_if_nonnumeric_major() {
+ match SchemaVersion::from_str("a.2") {
+ Err(SchemaVersionError::InvalidComponent(s)) => assert_eq!(s, "a"),
+ _ => unreachable!(),
+ }
+ }
+
+ #[test]
+ fn from_string_fails_if_nonnumeric_minor() {
+ match SchemaVersion::from_str("1.a") {
+ Err(SchemaVersionError::InvalidComponent(s)) => assert_eq!(s, "a"),
+ _ => unreachable!(),
+ }
+ }
+
+ #[test]
+ fn compatible_with_self() {
+ let v = SchemaVersion::new(1, 2);
+ assert!(v.is_compatible_with(&v));
+ }
+
+ #[test]
+ fn compatible_with_older_minor_version() {
+ let old = SchemaVersion::new(1, 2);
+ let new = SchemaVersion::new(1, 3);
+ assert!(new.is_compatible_with(&old));
+ }
+
+ #[test]
+ fn not_compatible_with_newer_minor_version() {
+ let old = SchemaVersion::new(1, 2);
+ let new = SchemaVersion::new(1, 3);
+ assert!(!old.is_compatible_with(&new));
+ }
+
+ #[test]
+ fn not_compatible_with_older_major_version() {
+ let old = SchemaVersion::new(1, 2);
+ let new = SchemaVersion::new(2, 0);
+ assert!(!new.is_compatible_with(&old));
+ }
+
+ #[test]
+ fn not_compatible_with_newer_major_version() {
+ let old = SchemaVersion::new(1, 2);
+ let new = SchemaVersion::new(2, 0);
+ assert!(!old.is_compatible_with(&new));
+ }
+}
diff --git a/src/server.rs b/src/server.rs
index 4d5880e..ffd4009 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,9 +1,81 @@
+//! Stuff related to the Obnam chunk server.
+
use crate::chunk::DataChunk;
use crate::chunkid::ChunkId;
use crate::chunkmeta::ChunkMeta;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::default::Default;
+use std::path::{Path, PathBuf};
+
+/// Server configuration.
+#[derive(Debug, Deserialize, Clone)]
+#[serde(deny_unknown_fields)]
+pub struct ServerConfig {
+ /// Path to directory where chunks are stored.
+ pub chunks: PathBuf,
+ /// Address where server is to listen.
+ pub address: String,
+ /// Path to TLS key.
+ pub tls_key: PathBuf,
+ /// Path to TLS certificate.
+ pub tls_cert: PathBuf,
+}
+
+/// Possible errors wittht server configuration.
+#[derive(Debug, thiserror::Error)]
+pub enum ServerConfigError {
+ /// The chunks directory doesn't exist.
+ #[error("Directory for chunks {0} does not exist")]
+ ChunksDirNotFound(PathBuf),
+
+ /// The TLS certificate doesn't exist.
+ #[error("TLS certificate {0} does not exist")]
+ TlsCertNotFound(PathBuf),
+
+ /// The TLS key doesn't exist.
+ #[error("TLS key {0} does not exist")]
+ TlsKeyNotFound(PathBuf),
+
+ /// Server address is wrong.
+ #[error("server address can't be resolved")]
+ BadServerAddress,
+
+ /// Failed to read configuration file.
+ #[error("failed to read configuration file {0}: {1}")]
+ Read(PathBuf, std::io::Error),
+
+ /// Failed to parse configuration file as YAML.
+ #[error("failed to parse configuration file as YAML: {0}")]
+ YamlParse(serde_yaml::Error),
+}
+
+impl ServerConfig {
+ /// Read, parse, and check the server configuration file.
+ pub fn read_config(filename: &Path) -> Result<Self, ServerConfigError> {
+ let config = match std::fs::read_to_string(filename) {
+ Ok(config) => config,
+ Err(err) => return Err(ServerConfigError::Read(filename.to_path_buf(), err)),
+ };
+ let config: Self = serde_yaml::from_str(&config).map_err(ServerConfigError::YamlParse)?;
+ config.check()?;
+ Ok(config)
+ }
+
+ /// Check the configuration.
+ pub fn check(&self) -> Result<(), ServerConfigError> {
+ if !self.chunks.exists() {
+ return Err(ServerConfigError::ChunksDirNotFound(self.chunks.clone()));
+ }
+ if !self.tls_cert.exists() {
+ return Err(ServerConfigError::TlsCertNotFound(self.tls_cert.clone()));
+ }
+ if !self.tls_key.exists() {
+ return Err(ServerConfigError::TlsKeyNotFound(self.tls_key.clone()));
+ }
+ Ok(())
+ }
+}
/// Result of creating a chunk.
#[derive(Debug, Serialize)]
@@ -12,17 +84,18 @@ pub struct Created {
}
impl Created {
+ /// Create a new created chunk id.
pub fn new(id: ChunkId) -> Self {
Created { id }
}
+ /// Convert to JSON.
pub fn to_json(&self) -> String {
serde_json::to_string(&self).unwrap()
}
}
/// Result of retrieving a chunk.
-
#[derive(Debug, Serialize)]
pub struct Fetched {
id: ChunkId,
@@ -30,31 +103,36 @@ pub struct Fetched {
}
impl Fetched {
+ /// Create a new id for a fetched chunk.
pub fn new(id: ChunkId, chunk: DataChunk) -> Self {
Fetched { id, chunk }
}
+ /// Convert to JSON.
pub fn to_json(&self) -> String {
serde_json::to_string(&self).unwrap()
}
}
/// Result of a search.
-#[derive(Debug, Default, PartialEq, Deserialize, Serialize)]
+#[derive(Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
pub struct SearchHits {
map: HashMap<String, ChunkMeta>,
}
impl SearchHits {
+ /// Insert a new chunk id to search results.
pub fn insert(&mut self, id: ChunkId, meta: ChunkMeta) {
self.map.insert(id.to_string(), meta);
}
- pub fn from_json(s: &str) -> anyhow::Result<Self> {
+ /// Convert from JSON.
+ pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
let map = serde_json::from_str(s)?;
Ok(SearchHits { map })
}
+ /// Convert to JSON.
pub fn to_json(&self) -> String {
serde_json::to_string(&self.map).unwrap()
}
@@ -63,6 +141,7 @@ impl SearchHits {
#[cfg(test)]
mod test_search_hits {
use super::{ChunkMeta, SearchHits};
+ use crate::label::Label;
#[test]
fn no_search_hits() {
@@ -73,7 +152,8 @@ mod test_search_hits {
#[test]
fn one_search_hit() {
let id = "abc".parse().unwrap();
- let meta = ChunkMeta::new("123");
+ let sum = Label::sha256(b"123");
+ let meta = ChunkMeta::new(&sum);
let mut hits = SearchHits::default();
hits.insert(id, meta);
eprintln!("hits: {:?}", hits);
diff --git a/src/store.rs b/src/store.rs
index e6cc71f..185370e 100644
--- a/src/store.rs
+++ b/src/store.rs
@@ -1,7 +1,7 @@
+//! Store chunks on-disk on server.
+
use crate::chunk::DataChunk;
use crate::chunkid::ChunkId;
-use crate::chunkmeta::ChunkMeta;
-use anyhow::Context;
use std::path::{Path, PathBuf};
/// Store chunks, with metadata, persistently.
@@ -13,6 +13,9 @@ pub struct Store {
dir: PathBuf,
}
+/// An error from a `Store` operation.
+pub type StoreError = std::io::Error;
+
impl Store {
/// Create a new Store to represent on-disk storage of chunks.x
pub fn new(dir: &Path) -> Self {
@@ -38,34 +41,34 @@ impl Store {
}
/// Save a chunk into a store.
- pub fn save(&self, id: &ChunkId, meta: &ChunkMeta, chunk: &DataChunk) -> anyhow::Result<()> {
+ pub fn save(&self, id: &ChunkId, chunk: &DataChunk) -> Result<(), StoreError> {
let (dir, metaname, dataname) = &self.filenames(id);
if !dir.exists() {
- let res = std::fs::create_dir_all(dir).into();
- if let Err(_) = res {
- return res.with_context(|| format!("creating directory {}", dir.display()));
- }
+ std::fs::create_dir_all(dir)?;
}
- std::fs::write(&metaname, meta.to_json())?;
- std::fs::write(&dataname, chunk.data())?;
+ std::fs::write(metaname, chunk.meta().to_json())?;
+ std::fs::write(dataname, chunk.data())?;
Ok(())
}
/// Load a chunk from a store.
- pub fn load(&self, id: &ChunkId) -> anyhow::Result<DataChunk> {
- let (_, _, dataname) = &self.filenames(id);
- let data = std::fs::read(&dataname)?;
- let data = DataChunk::new(data);
+ pub fn load(&self, id: &ChunkId) -> Result<DataChunk, StoreError> {
+ let (_, metaname, dataname) = &self.filenames(id);
+ let meta = std::fs::read(metaname)?;
+ let meta = serde_json::from_slice(&meta)?;
+
+ let data = std::fs::read(dataname)?;
+ let data = DataChunk::new(data, meta);
Ok(data)
}
/// Delete a chunk from a store.
- pub fn delete(&self, id: &ChunkId) -> anyhow::Result<()> {
+ pub fn delete(&self, id: &ChunkId) -> Result<(), StoreError> {
let (_, metaname, dataname) = &self.filenames(id);
- std::fs::remove_file(&metaname)?;
- std::fs::remove_file(&dataname)?;
+ std::fs::remove_file(metaname)?;
+ std::fs::remove_file(dataname)?;
Ok(())
}
}
diff --git a/src/workqueue.rs b/src/workqueue.rs
new file mode 100644
index 0000000..6b3ce80
--- /dev/null
+++ b/src/workqueue.rs
@@ -0,0 +1,62 @@
+//! A queue of work for [`crate::engine::Engine`].
+
+use tokio::sync::mpsc;
+
+/// A queue of work items.
+///
+/// An abstraction for producing items of work. For example, chunks of
+/// data in a file. The work items are put into an ordered queue to be
+/// worked on by another task. The queue is limited in size so that it
+/// doesn't grow impossibly large. This acts as a load-limiting
+/// synchronizing mechanism.
+///
+/// One async task produces work items and puts them into the queue,
+/// another consumes them from the queue. If the producer is too fast,
+/// the queue fills up, and the producer blocks when putting an item
+/// into the queue. If the queue is empty, the consumer blocks until
+/// there is something added to the queue.
+///
+/// The work items need to be abstracted as a type, and that type is
+/// given as a type parameter.
+pub struct WorkQueue<T> {
+ rx: mpsc::Receiver<T>,
+ tx: Option<mpsc::Sender<T>>,
+ size: usize,
+}
+
+impl<T> WorkQueue<T> {
+ /// Create a new work queue of a given maximum size.
+ pub fn new(queue_size: usize) -> Self {
+ let (tx, rx) = mpsc::channel(queue_size);
+ Self {
+ rx,
+ tx: Some(tx),
+ size: queue_size,
+ }
+ }
+
+ /// Get maximum size of queue.
+ pub fn size(&self) -> usize {
+ self.size
+ }
+
+ /// Add an item of work to the queue.
+ pub fn push(&self) -> mpsc::Sender<T> {
+ self.tx.as_ref().unwrap().clone()
+ }
+
+ /// Signal that no more work items will be added to the queue.
+ ///
+ /// You **must** call this, as otherwise the `next` function will
+ /// wait indefinitely.
+ pub fn close(&mut self) {
+ // println!("Chunkify::close closing sender");
+ self.tx = None;
+ }
+
+ /// Get the oldest work item from the queue, if any.
+ pub async fn next(&mut self) -> Option<T> {
+ // println!("next called");
+ self.rx.recv().await
+ }
+}