From 74751e82695b4b0dbed3466a2caf120a020b4229 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Fri, 16 Oct 2020 07:49:02 +0300 Subject: feat: obnam-restore restores generation to stdout --- src/bin/obnam-backup.rs | 100 ++++++++++++++++++++++++++++++++------- src/bin/obnam-restore.rs | 119 +++++++++++++++++++++++++++++++++++++++++++++++ src/chunk.rs | 34 +++++++++++++- src/chunkid.rs | 11 +++-- 4 files changed, 241 insertions(+), 23 deletions(-) create mode 100644 src/bin/obnam-restore.rs (limited to 'src') diff --git a/src/bin/obnam-backup.rs b/src/bin/obnam-backup.rs index d2de289..9370611 100644 --- a/src/bin/obnam-backup.rs +++ b/src/bin/obnam-backup.rs @@ -1,7 +1,9 @@ // Read stdin, split into chunks, upload new chunks to chunk server. use indicatif::{ProgressBar, ProgressStyle}; -use obnam::chunk::DataChunk; +use log::{debug, error, info, trace}; +use obnam::chunk::{DataChunk, GenerationChunk}; +use obnam::chunkid::ChunkId; use obnam::chunkmeta::ChunkMeta; use serde::Deserialize; use sha2::{Digest, Sha256}; @@ -13,6 +15,12 @@ use structopt::StructOpt; const BUFFER_SIZE: usize = 1024 * 1024; +#[derive(Debug, thiserror::Error)] +enum ClientError { + #[error("Server successful response to creating chunk lacked chunk id")] + NoCreatedChunkId, +} + #[derive(Debug, StructOpt)] #[structopt(name = "obnam-backup", about = "Simplistic backup client")] struct Opt { @@ -21,43 +29,70 @@ struct Opt { } fn main() -> anyhow::Result<()> { + pretty_env_logger::init(); + let opt = Opt::from_args(); let config = Config::read_config(&opt.config).unwrap(); + // let pb = ProgressBar::new_spinner(); let pb = ProgressBar::new_spinner(); pb.set_style( ProgressStyle::default_bar() .template("backing up:\n{bytes} ({bytes_per_sec}) {elapsed} {msg} {spinner}"), ); - println!("config: {:?}", config); + info!("obnam-backup starts up"); + info!("config: {:?}", config); let client = reqwest::blocking::Client::builder() .danger_accept_invalid_certs(true) .build()?; + let mut chunk_ids = vec![]; + let mut total_bytes = 0; + let mut new_chunks = 0; + let mut dup_chunks = 0; + let mut new_bytes = 0; + let mut dup_bytes = 0; + let stdin = std::io::stdin(); let mut stdin = BufReader::new(stdin); - let mut dup = 0; loop { match read_chunk(&mut stdin)? { None => break, Some((meta, chunk)) => { let n = chunk.data().len() as u64; - if !has_chunk(&client, &config, &meta)? { - pb.inc(n); - upload_chunk(&client, &config, meta, chunk)?; + debug!("read {} bytes", n); + total_bytes += n; + pb.inc(n); + if let Some(chunk_id) = has_chunk(&client, &config, &meta)? { + debug!("dup chunk: {}", chunk_id); + chunk_ids.push(chunk_id); + dup_chunks += 1; + dup_bytes += n; } else { - dup += n; + let chunk_id = upload_chunk(&client, &config, meta, chunk)?; + debug!("new chunk: {}", chunk_id); + chunk_ids.push(chunk_id); + new_chunks += 1; + new_bytes += n; } } } } + + let gen = GenerationChunk::new(chunk_ids); + let gen_id = upload_gen(&client, &config, &gen)?; + pb.finish(); - println!( - "read total {} bytes from stdin ({} dup)", - pb.position(), - dup - ); + info!("read total {} bytes from stdin", total_bytes); + info!("duplicate bytes: {}", dup_bytes); + info!("duplicate chunks: {}", dup_chunks); + info!("new bytes: {}", new_bytes); + info!("new chunks: {}", new_chunks); + info!("total chunks: {}", gen.len()); + info!("generation id: {}", gen_id); + info!("obnam-backup finished OK"); + println!("backup OK: generation id: {}", gen_id); Ok(()) } @@ -110,42 +145,71 @@ fn upload_chunk( config: &Config, meta: ChunkMeta, chunk: DataChunk, -) -> anyhow::Result<()> { +) -> anyhow::Result { let url = format!( "http://{}:{}/chunks", config.server_name, config.server_port ); - client + let res = client .post(&url) .header("chunk-meta", meta.to_json()) .body(chunk.data().to_vec()) .send()?; - Ok(()) + debug!("upload_chunk: res={:?}", res); + let res: HashMap = res.json()?; + let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { + debug!("upload_chunk: id={}", chunk_id); + chunk_id.parse().unwrap() + } else { + return Err(ClientError::NoCreatedChunkId.into()); + }; + Ok(chunk_id) +} + +fn upload_gen( + client: &reqwest::blocking::Client, + config: &Config, + gen: &GenerationChunk, +) -> anyhow::Result { + let meta = ChunkMeta::new_generation("metasha", "ended-sometime"); + let chunk = gen.to_data_chunk()?; + upload_chunk(client, config, meta, chunk) } fn has_chunk( client: &reqwest::blocking::Client, config: &Config, meta: &ChunkMeta, -) -> anyhow::Result { +) -> anyhow::Result> { let url = format!( "http://{}:{}/chunks", config.server_name, config.server_port, ); + trace!("has_chunk: url={:?}", url); let req = client .get(&url) .query(&[("sha256", meta.sha256())]) .build()?; let res = client.execute(req)?; + debug!("has_chunk: status={}", res.status()); let has = if res.status() != 200 { - false + debug!("has_chunk: error from server"); + None } else { let text = res.text()?; + debug!("has_chunk: text={:?}", text); let hits: HashMap = serde_json::from_str(&text)?; - !hits.is_empty() + debug!("has_chunk: hits={:?}", hits); + let mut iter = hits.iter(); + if let Some((chunk_id, _)) = iter.next() { + debug!("has_chunk: chunk_id={:?}", chunk_id); + Some(chunk_id.into()) + } else { + None + } }; Ok(has) diff --git a/src/bin/obnam-restore.rs b/src/bin/obnam-restore.rs new file mode 100644 index 0000000..2014ef8 --- /dev/null +++ b/src/bin/obnam-restore.rs @@ -0,0 +1,119 @@ +// Fetch a backup generation's chunks, write to stdout. + +use log::{debug, info, trace}; +use obnam::chunk::{DataChunk, GenerationChunk}; +use obnam::chunkid::ChunkId; +//use obnam::chunkmeta::ChunkMeta; +use serde::Deserialize; +use std::io::{stdout, Write}; +use std::path::{Path, PathBuf}; +use structopt::StructOpt; + +#[derive(Debug, thiserror::Error)] +enum ClientError { + #[error("Server does not have generation {0}")] + GenerationNotFound(String), + + #[error("Server does not have chunk {0}")] + ChunkNotFound(String), +} + +#[derive(Debug, StructOpt)] +#[structopt(name = "obnam-backup", about = "Simplistic backup client")] +struct Opt { + #[structopt(parse(from_os_str))] + config: PathBuf, + + #[structopt()] + gen_id: String, +} + +fn main() -> anyhow::Result<()> { + pretty_env_logger::init(); + + let opt = Opt::from_args(); + let config = Config::read_config(&opt.config).unwrap(); + + info!("obnam-restore starts up"); + info!("opt: {:?}", opt); + info!("config: {:?}", config); + + let client = reqwest::blocking::Client::builder() + .danger_accept_invalid_certs(true) + .build()?; + let mut stdout = stdout(); + + let gen = fetch_generation(&client, &config, &opt.gen_id)?; + debug!("gen: {:?}", gen); + for id in gen.chunk_ids() { + let chunk = fetch_chunk(&client, &config, id)?; + debug!("got chunk: {}", id); + stdout.write_all(chunk.data())?; + } + + Ok(()) +} + +#[derive(Debug, Deserialize, Clone)] +pub struct Config { + pub server_name: String, + pub server_port: u16, +} + +impl Config { + pub fn read_config(filename: &Path) -> anyhow::Result { + let config = std::fs::read_to_string(filename)?; + let config: Config = serde_yaml::from_str(&config)?; + Ok(config) + } +} + +fn fetch_generation( + client: &reqwest::blocking::Client, + config: &Config, + gen_id: &str, +) -> anyhow::Result { + let url = format!( + "http://{}:{}/chunks/{}", + config.server_name, config.server_port, gen_id, + ); + + trace!("fetch_generation: url={:?}", url); + let req = client.get(&url).build()?; + + let res = client.execute(req)?; + debug!("fetch_generation: status={}", res.status()); + if res.status() != 200 { + debug!("fetch_generation: error from server"); + return Err(ClientError::GenerationNotFound(gen_id.to_string()).into()); + } + + let text = res.text()?; + debug!("fetch_generation: text={:?}", text); + let gen = serde_json::from_str(&text)?; + Ok(gen) +} + +fn fetch_chunk( + client: &reqwest::blocking::Client, + config: &Config, + chunk_id: &ChunkId, +) -> anyhow::Result { + let url = format!( + "http://{}:{}/chunks/{}", + config.server_name, config.server_port, chunk_id, + ); + + trace!("fetch_chunk: url={:?}", url); + let req = client.get(&url).build()?; + + let res = client.execute(req)?; + debug!("fetch_chunk: status={}", res.status()); + if res.status() != 200 { + debug!("fetch_chunk: error from server"); + return Err(ClientError::ChunkNotFound(chunk_id.to_string()).into()); + } + + let body = res.bytes()?; + Ok(DataChunk::new(body.to_vec())) +} diff --git a/src/chunk.rs b/src/chunk.rs index 17159e0..e1358ee 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,4 +1,6 @@ -use serde::Serialize; +use crate::chunkid::ChunkId; +use serde::{Deserialize, Serialize}; +use std::default::Default; /// Store an arbitrary chunk of data. /// @@ -6,7 +8,7 @@ use serde::Serialize; /// /// A chunk also contains its associated metadata, except its /// identifier. -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct DataChunk { data: Vec, } @@ -22,3 +24,31 @@ impl DataChunk { &self.data } } + +#[derive(Default, Debug, Serialize, Deserialize)] +pub struct GenerationChunk { + chunk_ids: Vec, +} + +impl GenerationChunk { + pub fn new(chunk_ids: Vec) -> Self { + Self { chunk_ids } + } + + pub fn is_empty(&self) -> bool { + self.chunk_ids.is_empty() + } + + pub fn len(&self) -> usize { + self.chunk_ids.len() + } + + pub fn chunk_ids(&self) -> impl Iterator { + self.chunk_ids.iter() + } + + pub fn to_data_chunk(&self) -> anyhow::Result { + let json = serde_json::to_string(self)?; + Ok(DataChunk::new(json.as_bytes().to_vec())) + } +} diff --git a/src/chunkid.rs b/src/chunkid.rs index 44a76d6..e9582b5 100644 --- a/src/chunkid.rs +++ b/src/chunkid.rs @@ -1,4 +1,4 @@ -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::fmt; use std::hash::Hash; use std::str::FromStr; @@ -19,7 +19,7 @@ use uuid::Uuid; /// /// Because every identifier is meant to be different, there is no /// default value, since default values should be identical. -#[derive(Debug, Eq, PartialEq, Clone, Hash, Serialize)] +#[derive(Debug, Eq, PartialEq, Clone, Hash, Serialize, Deserialize)] pub struct ChunkId { id: String, } @@ -43,10 +43,15 @@ impl fmt::Display for ChunkId { } } +impl From<&String> for ChunkId { + fn from(s: &String) -> Self { + ChunkId { id: s.to_string() } + } +} + impl FromStr for ChunkId { type Err = (); - /// Parse a string representation of an identifier. fn from_str(s: &str) -> Result { Ok(ChunkId { id: s.to_string() }) } -- cgit v1.2.1