diff options
author | Lars Wirzenius <liw@liw.fi> | 2020-11-08 16:53:41 +0000 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2020-11-08 16:53:41 +0000 |
commit | 374d19b74d99b9cd9d2a39f80b1cd8fb5e799860 (patch) | |
tree | 2a62d4e7aac29317ab7243a8332fe44699d2f749 /src | |
parent | e28c0947b8e46cb830737fdb3f7c5f3e5237aa02 (diff) | |
parent | b7dfe8e4a95ec6a50e51e9b33460db8c63c0fab5 (diff) | |
download | obnam2-374d19b74d99b9cd9d2a39f80b1cd8fb5e799860.tar.gz |
Merge branch 'sqlite-client' into 'main'
Sqlite client
See merge request larswirzenius/obnam!5
Diffstat (limited to 'src')
-rw-r--r-- | src/bin/obnam-backup.rs | 225 | ||||
-rw-r--r-- | src/chunker.rs | 60 | ||||
-rw-r--r-- | src/chunkid.rs | 10 | ||||
-rw-r--r-- | src/client.rs | 166 | ||||
-rw-r--r-- | src/fsentry.rs | 118 | ||||
-rw-r--r-- | src/fsiter.rs | 37 | ||||
-rw-r--r-- | src/generation.rs | 94 | ||||
-rw-r--r-- | src/lib.rs | 5 |
8 files changed, 513 insertions, 202 deletions
diff --git a/src/bin/obnam-backup.rs b/src/bin/obnam-backup.rs index 9370611..f5d6b43 100644 --- a/src/bin/obnam-backup.rs +++ b/src/bin/obnam-backup.rs @@ -1,216 +1,37 @@ -// Read stdin, split into chunks, upload new chunks to chunk server. - -use indicatif::{ProgressBar, ProgressStyle}; -use log::{debug, error, info, trace}; -use obnam::chunk::{DataChunk, GenerationChunk}; -use obnam::chunkid::ChunkId; -use obnam::chunkmeta::ChunkMeta; -use serde::Deserialize; -use sha2::{Digest, Sha256}; -use std::collections::HashMap; -use std::io::prelude::*; -use std::io::BufReader; -use std::path::{Path, PathBuf}; +use log::{debug, info}; +use obnam::client::{BackupClient, ClientConfig}; +use obnam::fsiter::FsIterator; +use obnam::generation::Generation; +use std::path::PathBuf; use structopt::StructOpt; const BUFFER_SIZE: usize = 1024 * 1024; -#[derive(Debug, thiserror::Error)] -enum ClientError { - #[error("Server successful response to creating chunk lacked chunk id")] - NoCreatedChunkId, -} - -#[derive(Debug, StructOpt)] -#[structopt(name = "obnam-backup", about = "Simplistic backup client")] -struct Opt { - #[structopt(parse(from_os_str))] - config: PathBuf, -} - fn main() -> anyhow::Result<()> { pretty_env_logger::init(); let opt = Opt::from_args(); - let config = Config::read_config(&opt.config).unwrap(); - // let pb = ProgressBar::new_spinner(); - let pb = ProgressBar::new_spinner(); - pb.set_style( - ProgressStyle::default_bar() - .template("backing up:\n{bytes} ({bytes_per_sec}) {elapsed} {msg} {spinner}"), - ); - - info!("obnam-backup starts up"); - info!("config: {:?}", config); - - let client = reqwest::blocking::Client::builder() - .danger_accept_invalid_certs(true) - .build()?; - - let mut chunk_ids = vec![]; - let mut total_bytes = 0; - let mut new_chunks = 0; - let mut dup_chunks = 0; - let mut new_bytes = 0; - let mut dup_bytes = 0; - - let stdin = std::io::stdin(); - let mut stdin = BufReader::new(stdin); - loop { - match read_chunk(&mut stdin)? { - None => break, - Some((meta, chunk)) => { - let n = chunk.data().len() as u64; - debug!("read {} bytes", n); - total_bytes += n; - pb.inc(n); - if let Some(chunk_id) = has_chunk(&client, &config, &meta)? { - debug!("dup chunk: {}", chunk_id); - chunk_ids.push(chunk_id); - dup_chunks += 1; - dup_bytes += n; - } else { - let chunk_id = upload_chunk(&client, &config, meta, chunk)?; - debug!("new chunk: {}", chunk_id); - chunk_ids.push(chunk_id); - new_chunks += 1; - new_bytes += n; - } - } - } + info!("obnam-backup starts"); + debug!("opt: {:?}", opt); + let config = ClientConfig::read_config(&opt.config)?; + let client = BackupClient::new(&config.server_name, config.server_port)?; + + { + let mut gen = Generation::new(&config.dbname)?; + gen.insert_iter(FsIterator::new(&config.root).map(|entry| match entry { + Err(err) => Err(err), + Ok(entry) => client.upload_filesystem_entry(entry, BUFFER_SIZE), + }))?; } + let gen_id = client.upload_generation(&config.dbname, BUFFER_SIZE)?; + println!("gen id: {}", gen_id); - let gen = GenerationChunk::new(chunk_ids); - let gen_id = upload_gen(&client, &config, &gen)?; - - pb.finish(); - info!("read total {} bytes from stdin", total_bytes); - info!("duplicate bytes: {}", dup_bytes); - info!("duplicate chunks: {}", dup_chunks); - info!("new bytes: {}", new_bytes); - info!("new chunks: {}", new_chunks); - info!("total chunks: {}", gen.len()); - info!("generation id: {}", gen_id); - info!("obnam-backup finished OK"); - println!("backup OK: generation id: {}", gen_id); Ok(()) } -#[derive(Debug, Deserialize, Clone)] -pub struct Config { - pub server_name: String, - pub server_port: u16, -} - -impl Config { - pub fn read_config(filename: &Path) -> anyhow::Result<Config> { - let config = std::fs::read_to_string(filename)?; - let config: Config = serde_yaml::from_str(&config)?; - Ok(config) - } -} - -fn read_chunk<H>(handle: &mut H) -> anyhow::Result<Option<(ChunkMeta, DataChunk)>> -where - H: Read + BufRead, -{ - let mut buffer = [0; BUFFER_SIZE]; - let mut used = 0; - - loop { - let n = handle.read(&mut buffer[used..])?; - used += n; - if n == 0 || used == BUFFER_SIZE { - break; - } - } - - if used == 0 { - return Ok(None); - } - - let buffer = &buffer[..used]; - let mut hasher = Sha256::new(); - hasher.update(buffer); - let hash = hasher.finalize(); - let hash = format!("{:x}", hash); - let meta = ChunkMeta::new(&hash); - - let chunk = DataChunk::new(buffer.to_vec()); - Ok(Some((meta, chunk))) -} - -fn upload_chunk( - client: &reqwest::blocking::Client, - config: &Config, - meta: ChunkMeta, - chunk: DataChunk, -) -> anyhow::Result<ChunkId> { - let url = format!( - "http://{}:{}/chunks", - config.server_name, config.server_port - ); - - let res = client - .post(&url) - .header("chunk-meta", meta.to_json()) - .body(chunk.data().to_vec()) - .send()?; - debug!("upload_chunk: res={:?}", res); - let res: HashMap<String, String> = res.json()?; - let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { - debug!("upload_chunk: id={}", chunk_id); - chunk_id.parse().unwrap() - } else { - return Err(ClientError::NoCreatedChunkId.into()); - }; - Ok(chunk_id) -} - -fn upload_gen( - client: &reqwest::blocking::Client, - config: &Config, - gen: &GenerationChunk, -) -> anyhow::Result<ChunkId> { - let meta = ChunkMeta::new_generation("metasha", "ended-sometime"); - let chunk = gen.to_data_chunk()?; - upload_chunk(client, config, meta, chunk) -} - -fn has_chunk( - client: &reqwest::blocking::Client, - config: &Config, - meta: &ChunkMeta, -) -> anyhow::Result<Option<ChunkId>> { - let url = format!( - "http://{}:{}/chunks", - config.server_name, config.server_port, - ); - - trace!("has_chunk: url={:?}", url); - let req = client - .get(&url) - .query(&[("sha256", meta.sha256())]) - .build()?; - - let res = client.execute(req)?; - debug!("has_chunk: status={}", res.status()); - let has = if res.status() != 200 { - debug!("has_chunk: error from server"); - None - } else { - let text = res.text()?; - debug!("has_chunk: text={:?}", text); - let hits: HashMap<String, ChunkMeta> = serde_json::from_str(&text)?; - debug!("has_chunk: hits={:?}", hits); - let mut iter = hits.iter(); - if let Some((chunk_id, _)) = iter.next() { - debug!("has_chunk: chunk_id={:?}", chunk_id); - Some(chunk_id.into()) - } else { - None - } - }; - - Ok(has) +#[derive(Debug, StructOpt)] +#[structopt(name = "obnam-backup", about = "Simplistic backup client")] +struct Opt { + #[structopt(parse(from_os_str))] + config: PathBuf, } diff --git a/src/chunker.rs b/src/chunker.rs new file mode 100644 index 0000000..f4ca74c --- /dev/null +++ b/src/chunker.rs @@ -0,0 +1,60 @@ +use crate::chunk::DataChunk; +use crate::chunkmeta::ChunkMeta; +use sha2::{Digest, Sha256}; +use std::io::prelude::*; + +pub struct Chunker { + chunk_size: usize, + buf: Vec<u8>, + handle: std::fs::File, +} + +impl Chunker { + pub fn new(chunk_size: usize, handle: std::fs::File) -> Self { + let mut buf = vec![]; + buf.resize(chunk_size, 0); + Self { + chunk_size, + buf, + handle, + } + } + + pub fn read_chunk(&mut self) -> anyhow::Result<Option<(ChunkMeta, DataChunk)>> { + let mut used = 0; + + loop { + let n = self.handle.read(&mut self.buf.as_mut_slice()[used..])?; + used += n; + if n == 0 || used == self.chunk_size { + break; + } + } + + if used == 0 { + return Ok(None); + } + + let buffer = &self.buf.as_slice()[..used]; + let mut hasher = Sha256::new(); + hasher.update(buffer); + let hash = hasher.finalize(); + let hash = format!("{:x}", hash); + let meta = ChunkMeta::new(&hash); + + let chunk = DataChunk::new(buffer.to_vec()); + Ok(Some((meta, chunk))) + } +} + +impl Iterator for Chunker { + type Item = anyhow::Result<(ChunkMeta, DataChunk)>; + + fn next(&mut self) -> Option<anyhow::Result<(ChunkMeta, DataChunk)>> { + match self.read_chunk() { + Ok(None) => None, + Ok(Some((meta, chunk))) => Some(Ok((meta, chunk))), + Err(e) => Some(Err(e)), + } + } +} diff --git a/src/chunkid.rs b/src/chunkid.rs index e9582b5..781e497 100644 --- a/src/chunkid.rs +++ b/src/chunkid.rs @@ -1,3 +1,5 @@ +use rusqlite::types::ToSqlOutput; +use rusqlite::ToSql; use serde::{Deserialize, Serialize}; use std::fmt; use std::hash::Hash; @@ -34,6 +36,14 @@ impl ChunkId { } } +impl ToSql for ChunkId { + fn to_sql(&self) -> rusqlite::Result<ToSqlOutput> { + Ok(ToSqlOutput::Owned(rusqlite::types::Value::Text( + self.id.clone(), + ))) + } +} + impl fmt::Display for ChunkId { /// Format an identifier for display. /// diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..e7b5a3e --- /dev/null +++ b/src/client.rs @@ -0,0 +1,166 @@ +use crate::chunk::DataChunk; +use crate::chunk::GenerationChunk; +use crate::chunker::Chunker; +use crate::chunkid::ChunkId; +use crate::chunkmeta::ChunkMeta; +use crate::fsentry::{FilesystemEntry, FilesystemKind}; +use log::{debug, error, info, trace}; +use reqwest::blocking::Client; +use serde::Deserialize; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +#[derive(Debug, Deserialize, Clone)] +pub struct ClientConfig { + pub server_name: String, + pub server_port: u16, + pub dbname: PathBuf, + pub root: PathBuf, +} + +impl ClientConfig { + pub fn read_config(filename: &Path) -> anyhow::Result<Self> { + trace!("read_config: filename={:?}", filename); + let config = std::fs::read_to_string(filename)?; + let config = serde_yaml::from_str(&config)?; + Ok(config) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ClientError { + #[error("Server successful response to creating chunk lacked chunk id")] + NoCreatedChunkId, +} + +pub struct BackupClient { + client: Client, + base_url: String, +} + +impl BackupClient { + pub fn new(host: &str, port: u16) -> anyhow::Result<Self> { + let client = Client::builder() + .danger_accept_invalid_certs(true) + .build()?; + let base_url = format!("http://{}:{}/chunks", host, port,); + Ok(Self { client, base_url }) + } + + pub fn upload_filesystem_entry( + &self, + e: FilesystemEntry, + size: usize, + ) -> anyhow::Result<(FilesystemEntry, Vec<ChunkId>)> { + let ids = match e.kind() { + FilesystemKind::Regular => self.read_file(e.path(), size)?, + FilesystemKind::Directory => vec![], + }; + Ok((e, ids)) + } + + pub fn upload_generation(&self, filename: &Path, size: usize) -> anyhow::Result<ChunkId> { + let ids = self.read_file(filename, size)?; + let gen = GenerationChunk::new(ids); + let meta = ChunkMeta::new_generation("checksum", "endtime"); + let gen_id = self.upload_gen_chunk(meta, gen)?; + Ok(gen_id) + } + + fn read_file(&self, filename: &Path, size: usize) -> anyhow::Result<Vec<ChunkId>> { + info!("uploading {}", filename.display()); + let file = std::fs::File::open(filename)?; + let chunker = Chunker::new(size, file); + let chunk_ids = self.upload_new_file_chunks(chunker)?; + Ok(chunk_ids) + } + + fn base_url(&self) -> &str { + &self.base_url + } + + pub fn has_chunk(&self, meta: &ChunkMeta) -> anyhow::Result<Option<ChunkId>> { + trace!("has_chunk: url={:?}", self.base_url()); + let req = self + .client + .get(self.base_url()) + .query(&[("sha256", meta.sha256())]) + .build()?; + + let res = self.client.execute(req)?; + debug!("has_chunk: status={}", res.status()); + let has = if res.status() != 200 { + debug!("has_chunk: error from server"); + None + } else { + let text = res.text()?; + debug!("has_chunk: text={:?}", text); + let hits: HashMap<String, ChunkMeta> = serde_json::from_str(&text)?; + debug!("has_chunk: hits={:?}", hits); + let mut iter = hits.iter(); + if let Some((chunk_id, _)) = iter.next() { + debug!("has_chunk: chunk_id={:?}", chunk_id); + Some(chunk_id.into()) + } else { + None + } + }; + + Ok(has) + } + + pub fn upload_chunk(&self, meta: ChunkMeta, chunk: DataChunk) -> anyhow::Result<ChunkId> { + let res = self + .client + .post(self.base_url()) + .header("chunk-meta", meta.to_json()) + .body(chunk.data().to_vec()) + .send()?; + debug!("upload_chunk: res={:?}", res); + let res: HashMap<String, String> = res.json()?; + let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { + debug!("upload_chunk: id={}", chunk_id); + chunk_id.parse().unwrap() + } else { + return Err(ClientError::NoCreatedChunkId.into()); + }; + Ok(chunk_id) + } + + pub fn upload_gen_chunk( + &self, + meta: ChunkMeta, + gen: GenerationChunk, + ) -> anyhow::Result<ChunkId> { + let res = self + .client + .post(self.base_url()) + .header("chunk-meta", meta.to_json()) + .body(serde_json::to_string(&gen)?) + .send()?; + debug!("upload_chunk: res={:?}", res); + let res: HashMap<String, String> = res.json()?; + let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { + debug!("upload_chunk: id={}", chunk_id); + chunk_id.parse().unwrap() + } else { + return Err(ClientError::NoCreatedChunkId.into()); + }; + Ok(chunk_id) + } + + pub fn upload_new_file_chunks(&self, chunker: Chunker) -> anyhow::Result<Vec<ChunkId>> { + let mut chunk_ids = vec![]; + for item in chunker { + let (meta, chunk) = item?; + if let Some(chunk_id) = self.has_chunk(&meta)? { + chunk_ids.push(chunk_id); + } else { + let chunk_id = self.upload_chunk(meta, chunk)?; + chunk_ids.push(chunk_id); + } + } + + Ok(chunk_ids) + } +} diff --git a/src/fsentry.rs b/src/fsentry.rs new file mode 100644 index 0000000..2a5b153 --- /dev/null +++ b/src/fsentry.rs @@ -0,0 +1,118 @@ +use std::path::{Path, PathBuf}; + +/// A file system entry. +/// +/// Represent all backup-relevant the metadata about a file system +/// object: fully qualified pathname, type, length (if applicable), +/// etc. Everything except the content of a regular file or the +/// contents of a directory. +/// +/// This is everything Obnam cares about each file system object, when +/// making a backup. +#[derive(Debug)] +pub struct FilesystemEntry { + kind: FilesystemKind, + path: PathBuf, + len: u64, +} + +impl FilesystemEntry { + fn new(kind: FilesystemKind, path: &Path, len: u64) -> Self { + Self { + path: path.to_path_buf(), + kind, + len, + } + } + + pub fn regular<P>(path: P, len: u64) -> Self + where + P: AsRef<Path>, + { + Self::new(FilesystemKind::Regular, path.as_ref(), len) + } + + pub fn directory<P>(path: P) -> Self + where + P: AsRef<Path>, + { + Self::new(FilesystemKind::Directory, path.as_ref(), 0) + } + + pub fn kind(&self) -> FilesystemKind { + self.kind + } + + pub fn path(&self) -> &Path { + &self.path + } + + pub fn len(&self) -> u64 { + self.len + } +} + +/// Different types of file system entries. +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum FilesystemKind { + Regular, + Directory, +} + +impl FilesystemKind { + pub fn as_code(&self) -> u8 { + match self { + FilesystemKind::Regular => 0, + FilesystemKind::Directory => 1, + } + } + + pub fn from_code(code: u8) -> anyhow::Result<Self> { + match code { + 0 => Ok(FilesystemKind::Regular), + 1 => Ok(FilesystemKind::Directory), + _ => Err(Error::UnknownFileKindCode(code).into()), + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("unknown file kind code {0}")] + UnknownFileKindCode(u8), +} + +#[cfg(test)] +mod test { + use super::{FilesystemEntry, FilesystemKind}; + use std::path::Path; + + #[test] + fn regular_file() { + let filename = Path::new("foo.dat"); + let len = 123; + let e = FilesystemEntry::regular(filename, len); + assert_eq!(e.kind(), FilesystemKind::Regular); + assert_eq!(e.path(), filename); + assert_eq!(e.len(), len); + } + + #[test] + fn directory() { + let filename = Path::new("foo.dat"); + let e = FilesystemEntry::directory(filename); + assert_eq!(e.kind(), FilesystemKind::Directory); + assert_eq!(e.path(), filename); + assert_eq!(e.len(), 0); + } + + #[test] + fn file_kind_regular_round_trips() { + one_file_kind_round_trip(FilesystemKind::Regular); + one_file_kind_round_trip(FilesystemKind::Directory); + } + + fn one_file_kind_round_trip(kind: FilesystemKind) { + assert_eq!(kind, FilesystemKind::from_code(kind.as_code()).unwrap()); + } +} diff --git a/src/fsiter.rs b/src/fsiter.rs new file mode 100644 index 0000000..929c81e --- /dev/null +++ b/src/fsiter.rs @@ -0,0 +1,37 @@ +use crate::fsentry::FilesystemEntry; +use std::path::Path; +use walkdir::{IntoIter, WalkDir}; + +/// Iterator over file system entries in a directory tree. +pub struct FsIterator { + iter: IntoIter, +} + +impl FsIterator { + pub fn new(root: &Path) -> Self { + Self { + iter: WalkDir::new(root).into_iter(), + } + } +} + +impl Iterator for FsIterator { + type Item = Result<FilesystemEntry, anyhow::Error>; + fn next(&mut self) -> Option<Self::Item> { + match self.iter.next() { + None => None, + Some(Ok(entry)) => Some(new_entry(&entry)), + Some(Err(err)) => Some(Err(err.into())), + } + } +} + +fn new_entry(e: &walkdir::DirEntry) -> anyhow::Result<FilesystemEntry> { + let meta = e.metadata()?; + let kind = if meta.is_dir() { + FilesystemEntry::directory(e.path()) + } else { + FilesystemEntry::regular(e.path(), meta.len()) + }; + Ok(kind) +} diff --git a/src/generation.rs b/src/generation.rs new file mode 100644 index 0000000..2cae478 --- /dev/null +++ b/src/generation.rs @@ -0,0 +1,94 @@ +use crate::fsentry::FilesystemEntry; +//use crate::fsiter::FsIterator; +use crate::chunkid::ChunkId; +use rusqlite::{params, Connection, OpenFlags, Transaction}; +use std::os::unix::ffi::OsStringExt; +use std::path::Path; + +/// A backup generation. +pub struct Generation { + conn: Connection, + fileno: u64, +} + +impl Generation { + pub fn new<P>(filename: P) -> anyhow::Result<Self> + where + P: AsRef<Path>, + { + let flags = OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE; + let conn = Connection::open_with_flags(filename, flags)?; + conn.execute( + "CREATE TABLE files (fileid INTEGER PRIMARY KEY, path BLOB, kind INTEGER, len INTEGER)", + params![], + )?; + conn.execute( + "CREATE TABLE chunks (fileid INTEGER, chunkid TEXT)", + params![], + )?; + conn.pragma_update(None, "journal_mode", &"WAL")?; + Ok(Self { conn, fileno: 0 }) + } + + pub fn insert(&mut self, e: FilesystemEntry, ids: &[ChunkId]) -> anyhow::Result<()> { + let t = self.conn.transaction()?; + insert_one(&t, e, self.fileno, ids)?; + self.fileno += 1; + t.commit()?; + Ok(()) + } + + pub fn insert_iter<'a>( + &mut self, + entries: impl Iterator<Item = anyhow::Result<(FilesystemEntry, Vec<ChunkId>)>>, + ) -> anyhow::Result<()> { + let t = self.conn.transaction()?; + for r in entries { + let (e, ids) = r?; + insert_one(&t, e, self.fileno, &ids[..])?; + self.fileno += 1; + } + t.commit()?; + Ok(()) + } +} + +fn insert_one( + t: &Transaction, + e: FilesystemEntry, + fileno: u64, + ids: &[ChunkId], +) -> anyhow::Result<()> { + let path = e.path().as_os_str().to_os_string().into_vec(); + let kind = e.kind().as_code(); + let len = e.len() as i64; + let fileno = fileno as i64; + t.execute( + "INSERT INTO files (fileid, path, kind, len) VALUES (?1, ?2, ?3, ?4)", + params![fileno, path, kind, len], + )?; + for id in ids { + t.execute( + "INSERT INTO chunks (fileid, chunkid) VALUES (?1, ?2)", + params![fileno, id], + )?; + } + Ok(()) +} + +#[cfg(test)] +mod test { + use super::Generation; + use tempfile::NamedTempFile; + + #[test] + fn empty() { + let filename = NamedTempFile::new().unwrap().path().to_path_buf(); + { + let mut _gen = Generation::new(&filename).unwrap(); + // _gen is dropped here; the connection is close; the file + // should not be removed. + } + assert!(filename.exists()); + } +} @@ -1,6 +1,11 @@ pub mod chunk; +pub mod chunker; pub mod chunkid; pub mod chunkmeta; +pub mod client; +pub mod fsentry; +pub mod fsiter; +pub mod generation; pub mod index; pub mod server; pub mod store; |