From 383cff243c8937e31c48145a2422a99ff2109f6e Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Thu, 12 May 2022 19:26:08 +0300 Subject: feat: add a new ChunkStore to store chunks locally or remotely Sponsored-by: author --- src/chunkstore.rs | 304 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 2 files changed, 305 insertions(+) create mode 100644 src/chunkstore.rs diff --git a/src/chunkstore.rs b/src/chunkstore.rs new file mode 100644 index 0000000..f421805 --- /dev/null +++ b/src/chunkstore.rs @@ -0,0 +1,304 @@ +//! Access local and remote chunk stores. +//! +//! A chunk store may be local and accessed via the file system, or +//! remote and accessed over HTTP. This module implements both. This +//! module only handles encrypted chunks. + +use crate::chunkid::ChunkId; +use crate::chunkmeta::ChunkMeta; +use crate::cipher::EncryptedChunk; +use crate::config::{ClientConfig, ClientConfigError}; +use crate::index::{Index, IndexError}; + +use log::{debug, error, info}; +use reqwest::header::HeaderMap; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +/// A chunk store. +/// +/// The store may be local or remote. +pub enum ChunkStore { + /// A local chunk store. + Local(LocalStore), + + /// A remote chunk store. + Remote(RemoteStore), +} + +impl ChunkStore { + /// Open a local chunk store. + pub fn local>(path: P) -> Result { + let store = LocalStore::new(path.as_ref())?; + Ok(Self::Local(store)) + } + + /// Open a remote chunk store. + pub fn remote(config: &ClientConfig) -> Result { + let store = RemoteStore::new(config)?; + Ok(Self::Remote(store)) + } + + /// Does the store have a chunk with a given label? + pub async fn find_by_label(&self, meta: &ChunkMeta) -> Result, StoreError> { + match self { + Self::Local(store) => store.find_by_label(meta), + Self::Remote(store) => store.find_by_label(meta).await, + } + } + + /// Store a chunk in the store. + /// + /// The store chooses an id for the chunk. + pub async fn put(&mut self, chunk: EncryptedChunk, meta: &ChunkMeta) -> Result { + match self { + Self::Local(store) => store.put(chunk, meta), + Self::Remote(store) => store.put(chunk, meta).await, + } + } + + /// Get a chunk given its id. + pub async fn get(&self, id: &ChunkId) -> Result<(Vec, ChunkMeta), StoreError> { + match self { + Self::Local(store) => store.get(id), + Self::Remote(store) => store.get(id).await, + } + } +} + +/// A local chunk store. +pub struct LocalStore { + path: PathBuf, + index: Index, +} + +impl LocalStore { + fn new(path: &Path) -> Result { + Ok(Self { + path: path.to_path_buf(), + index: Index::new(path)?, + }) + } + + fn find_by_label(&self, meta: &ChunkMeta) -> Result, StoreError> { + self.index + .find_by_label(meta.label()) + .map_err(StoreError::Index) + } + + fn put(&mut self, chunk: EncryptedChunk, meta: &ChunkMeta) -> Result { + let id = ChunkId::new(); + let (dir, filename) = self.filename(&id); + + if !dir.exists() { + std::fs::create_dir_all(&dir).map_err(|err| StoreError::ChunkMkdir(dir, err))?; + } + + let data = chunk.ciphertext().to_vec(); + std::fs::write(&filename, &data) + .map_err(|err| StoreError::WriteChunk(filename.clone(), err))?; + self.index + .insert_meta(id.clone(), meta.clone()) + .map_err(StoreError::Index)?; + Ok(id) + } + + fn get(&self, id: &ChunkId) -> Result<(Vec, ChunkMeta), StoreError> { + let meta = self.index.get_meta(id)?; + + let (_, filename) = &self.filename(id); + + let raw = + std::fs::read(&filename).map_err(|err| StoreError::ReadChunk(filename.clone(), err))?; + + Ok((raw, meta)) + } + + fn filename(&self, id: &ChunkId) -> (PathBuf, PathBuf) { + let bytes = id.as_bytes(); + assert!(bytes.len() > 3); + let a = bytes[0]; + let b = bytes[1]; + let c = bytes[2]; + let dir = self.path.join(format!("{}/{}/{}", a, b, c)); + let filename = dir.join(format!("{}", id)); + (dir, filename) + } +} + +/// A remote chunk store. +pub struct RemoteStore { + client: reqwest::Client, + base_url: String, +} + +impl RemoteStore { + fn new(config: &ClientConfig) -> Result { + info!("creating remote store with config: {:#?}", config); + + let client = reqwest::Client::builder() + .danger_accept_invalid_certs(!config.verify_tls_cert) + .build() + .map_err(StoreError::ReqwestError)?; + Ok(Self { + client, + base_url: config.server_url.to_string(), + }) + } + + async fn find_by_label(&self, meta: &ChunkMeta) -> Result, StoreError> { + let body = match self.get_helper("", &[("label", meta.label())]).await { + Ok((_, body)) => body, + Err(err) => return Err(err), + }; + + let hits: HashMap = + serde_json::from_slice(&body).map_err(StoreError::JsonParse)?; + let ids = hits.iter().map(|(id, _)| ChunkId::recreate(id)).collect(); + Ok(ids) + } + + async fn put(&self, chunk: EncryptedChunk, meta: &ChunkMeta) -> Result { + let res = self + .client + .post(&self.chunks_url()) + .header("chunk-meta", meta.to_json()) + .body(chunk.ciphertext().to_vec()) + .send() + .await + .map_err(StoreError::ReqwestError)?; + let res: HashMap = res.json().await.map_err(StoreError::ReqwestError)?; + debug!("upload_chunk: res={:?}", res); + let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { + debug!("upload_chunk: id={}", chunk_id); + chunk_id.parse().unwrap() + } else { + return Err(StoreError::NoCreatedChunkId); + }; + info!("uploaded_chunk {}", chunk_id); + Ok(chunk_id) + } + + async fn get(&self, id: &ChunkId) -> Result<(Vec, ChunkMeta), StoreError> { + let (headers, body) = self.get_helper(&format!("/{}", id), &[]).await?; + let meta = self.get_chunk_meta_header(id, &headers)?; + Ok((body, meta)) + } + + fn base_url(&self) -> &str { + &self.base_url + } + + fn chunks_url(&self) -> String { + format!("{}/v1/chunks", self.base_url()) + } + + async fn get_helper( + &self, + path: &str, + query: &[(&str, &str)], + ) -> Result<(HeaderMap, Vec), StoreError> { + let url = format!("{}{}", &self.chunks_url(), path); + info!("GET {}", url); + + // Build HTTP request structure. + let req = self + .client + .get(&url) + .query(query) + .build() + .map_err(StoreError::ReqwestError)?; + + // Make HTTP request. + let res = self + .client + .execute(req) + .await + .map_err(StoreError::ReqwestError)?; + + // Did it work? + if res.status() != 200 { + return Err(StoreError::NotFound(path.to_string())); + } + + // Return headers and body. + let headers = res.headers().clone(); + let body = res.bytes().await.map_err(StoreError::ReqwestError)?; + let body = body.to_vec(); + Ok((headers, body)) + } + + fn get_chunk_meta_header( + &self, + chunk_id: &ChunkId, + headers: &HeaderMap, + ) -> Result { + let meta = headers.get("chunk-meta"); + + if meta.is_none() { + let err = StoreError::NoChunkMeta(chunk_id.clone()); + error!("fetching chunk {} failed: {}", chunk_id, err); + return Err(err); + } + + let meta = meta + .unwrap() + .to_str() + .map_err(StoreError::MetaHeaderToString)?; + let meta: ChunkMeta = serde_json::from_str(meta).map_err(StoreError::JsonParse)?; + + Ok(meta) + } +} + +/// Possible errors from using a ChunkStore. +#[derive(Debug, thiserror::Error)] +pub enum StoreError { + /// FIXME + #[error("FIXME")] + FIXME, + + /// Error from a chunk index. + #[error(transparent)] + Index(#[from] IndexError), + + /// An error from the HTTP library. + #[error("error from reqwest library: {0}")] + ReqwestError(reqwest::Error), + + /// Client configuration is wrong. + #[error(transparent)] + ClientConfigError(#[from] ClientConfigError), + + /// Server claims to not have an entity. + #[error("Server does not have {0}")] + NotFound(String), + + /// Server didn't give us a chunk's metadata. + #[error("Server response did not have a 'chunk-meta' header for chunk {0}")] + NoChunkMeta(ChunkId), + + /// An error with the `chunk-meta` header. + #[error("couldn't convert response chunk-meta header to string: {0}")] + MetaHeaderToString(reqwest::header::ToStrError), + + /// Error parsing JSON. + #[error("failed to parse JSON: {0}")] + JsonParse(serde_json::Error), + + /// An error creating chunk directory. + #[error("Failed to create chunk directory {0}")] + ChunkMkdir(PathBuf, #[source] std::io::Error), + + /// An error writing a chunk file. + #[error("Failed to write chunk {0}")] + WriteChunk(PathBuf, #[source] std::io::Error), + + /// An error reading a chunk file. + #[error("Failed to read chunk {0}")] + ReadChunk(PathBuf, #[source] std::io::Error), + + /// No chunk id for uploaded chunk. + #[error("Server response claimed it had created a chunk, but lacked chunk id")] + NoCreatedChunkId, +} diff --git a/src/lib.rs b/src/lib.rs index fbbea15..a54c5ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod chunk; pub mod chunker; pub mod chunkid; pub mod chunkmeta; +pub mod chunkstore; pub mod cipher; pub mod client; pub mod cmd; -- cgit v1.2.1 From 69f2b3d9495cb4e8cff8eff4a49ac944dafee1ad Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Thu, 12 May 2022 21:21:19 +0300 Subject: use new ChunkStore for remote has_chunk Sponsored-by: author --- src/chunkstore.rs | 6 +++++- src/client.rs | 24 +++++++++--------------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/src/chunkstore.rs b/src/chunkstore.rs index f421805..1f67e2f 100644 --- a/src/chunkstore.rs +++ b/src/chunkstore.rs @@ -50,7 +50,11 @@ impl ChunkStore { /// Store a chunk in the store. /// /// The store chooses an id for the chunk. - pub async fn put(&mut self, chunk: EncryptedChunk, meta: &ChunkMeta) -> Result { + pub async fn put( + &mut self, + chunk: EncryptedChunk, + meta: &ChunkMeta, + ) -> Result { match self { Self::Local(store) => store.put(chunk, meta), Self::Remote(store) => store.put(chunk, meta).await, diff --git a/src/client.rs b/src/client.rs index bed5f1e..6b89991 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,6 +5,7 @@ use crate::chunk::{ }; use crate::chunkid::ChunkId; use crate::chunkmeta::ChunkMeta; +use crate::chunkstore::{ChunkStore, StoreError}; use crate::cipher::{CipherEngine, CipherError}; use crate::config::{ClientConfig, ClientConfigError}; use crate::generation::{FinishedGeneration, GenId, LocalGeneration, LocalGenerationError}; @@ -100,10 +101,15 @@ pub enum ClientError { /// Failed to write a file. #[error("failed to write to file {0}: {1}")] FileWrite(PathBuf, std::io::Error), + + /// Error from a chunk store. + #[error(transparent)] + ChunkStore(#[from] StoreError), } /// Client for the Obnam server HTTP API. pub struct BackupClient { + store: ChunkStore, client: reqwest::Client, base_url: String, cipher: CipherEngine, @@ -121,6 +127,7 @@ impl BackupClient { .build() .map_err(ClientError::ReqwestError)?; Ok(Self { + store: ChunkStore::remote(config)?, client, base_url: config.server_url.to_string(), cipher: CipherEngine::new(&pass), @@ -137,21 +144,8 @@ impl BackupClient { /// Does the server have a chunk? pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result, ClientError> { - let body = match self.get("", &[("label", meta.label())]).await { - Ok((_, body)) => body, - Err(err) => return Err(err), - }; - - let hits: HashMap = - serde_json::from_slice(&body).map_err(ClientError::JsonParse)?; - let mut iter = hits.iter(); - let has = if let Some((chunk_id, _)) = iter.next() { - Some(chunk_id.into()) - } else { - None - }; - - Ok(has) + let mut ids = self.store.find_by_label(meta).await?; + Ok(ids.pop()) } /// Upload a data chunk to the server. -- cgit v1.2.1 From 68ed0be9153a923185cebfc7fd88ee5aff012db8 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Thu, 12 May 2022 21:41:49 +0300 Subject: use new chnunk store for backup client fetch_chunk Sponsored-by: author --- src/client.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index 6b89991..21e7cdb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -214,9 +214,7 @@ impl BackupClient { /// Fetch a data chunk from the server, given the chunk identifier. pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result { - let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?; - let meta = self.get_chunk_meta_header(chunk_id, &headers)?; - + let (body, meta) = self.store.get(chunk_id).await?; let meta_bytes = meta.to_json_vec(); let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?; -- cgit v1.2.1 From 6d0e95d34a87bb11a2b62588e75d282d4de51095 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Fri, 13 May 2022 07:41:18 +0300 Subject: impl backup client find client trusts with new chunk store Sponsored-by: author --- src/client.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/client.rs b/src/client.rs index 21e7cdb..93bb08a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -190,15 +190,9 @@ impl BackupClient { } async fn find_client_trusts(&self) -> Result, ClientError> { - let label = Label::literal("client-trust").serialize(); - let body = match self.get("", &[("label", &label)]).await { - Ok((_, body)) => body, - Err(err) => return Err(err), - }; - - let hits: HashMap = - serde_json::from_slice(&body).map_err(ClientError::JsonParse)?; - let ids = hits.iter().map(|(id, _)| id.into()).collect(); + let label = Label::literal("client-trust"); + let meta = ChunkMeta::new(&label); + let ids = self.store.find_by_label(&meta).await?; Ok(ids) } -- cgit v1.2.1 From 0ec25b98ee875d84ae8760e63149cc90634e45c0 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Fri, 13 May 2022 08:02:43 +0300 Subject: drop dead code Sponsored-by: author --- src/client.rs | 58 ---------------------------------------------------------- 1 file changed, 58 deletions(-) diff --git a/src/client.rs b/src/client.rs index 93bb08a..18e1f72 100644 --- a/src/client.rs +++ b/src/client.rs @@ -13,7 +13,6 @@ use crate::genlist::GenerationList; use crate::label::Label; use log::{debug, error, info}; -use reqwest::header::HeaderMap; use std::collections::HashMap; use std::fs::File; use std::io::prelude::*; @@ -243,61 +242,4 @@ impl BackupClient { let gen = LocalGeneration::open(dbname)?; Ok(gen) } - - async fn get( - &self, - path: &str, - query: &[(&str, &str)], - ) -> Result<(HeaderMap, Vec), ClientError> { - let url = format!("{}{}", &self.chunks_url(), path); - info!("GET {}", url); - - // Build HTTP request structure. - let req = self - .client - .get(&url) - .query(query) - .build() - .map_err(ClientError::ReqwestError)?; - - // Make HTTP request. - let res = self - .client - .execute(req) - .await - .map_err(ClientError::ReqwestError)?; - - // Did it work? - if res.status() != 200 { - return Err(ClientError::NotFound(path.to_string())); - } - - // Return headers and body. - let headers = res.headers().clone(); - let body = res.bytes().await.map_err(ClientError::ReqwestError)?; - let body = body.to_vec(); - Ok((headers, body)) - } - - fn get_chunk_meta_header( - &self, - chunk_id: &ChunkId, - headers: &HeaderMap, - ) -> Result { - let meta = headers.get("chunk-meta"); - - if meta.is_none() { - let err = ClientError::NoChunkMeta(chunk_id.clone()); - error!("fetching chunk {} failed: {}", chunk_id, err); - return Err(err); - } - - let meta = meta - .unwrap() - .to_str() - .map_err(ClientError::MetaHeaderToString)?; - let meta: ChunkMeta = serde_json::from_str(meta).map_err(ClientError::JsonParse)?; - - Ok(meta) - } } -- cgit v1.2.1 From c76569bbfdd3c40e427c99fa1a88158344cdfa67 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Fri, 13 May 2022 08:13:55 +0300 Subject: use new chunk store for all of client.rs Sponsored-by: author --- src/backup_run.rs | 25 ++++++++++++++----------- src/client.rs | 43 ++++--------------------------------------- src/cmd/backup.rs | 6 +++--- 3 files changed, 21 insertions(+), 53 deletions(-) diff --git a/src/backup_run.rs b/src/backup_run.rs index 516e172..372ef65 100644 --- a/src/backup_run.rs +++ b/src/backup_run.rs @@ -31,7 +31,7 @@ const SQLITE_CHUNK_SIZE: usize = MIB as usize; /// A running backup. pub struct BackupRun<'a> { checksum_kind: Option, - client: &'a BackupClient, + client: &'a mut BackupClient, policy: BackupPolicy, buffer_size: usize, progress: Option, @@ -106,7 +106,10 @@ pub struct RootsBackupOutcome { impl<'a> BackupRun<'a> { /// Create a new run for an initial backup. - pub fn initial(config: &ClientConfig, client: &'a BackupClient) -> Result { + pub fn initial( + config: &ClientConfig, + client: &'a mut BackupClient, + ) -> Result { Ok(Self { checksum_kind: Some(DEFAULT_CHECKSUM_KIND), client, @@ -119,7 +122,7 @@ impl<'a> BackupRun<'a> { /// Create a new run for an incremental backup. pub fn incremental( config: &ClientConfig, - client: &'a BackupClient, + client: &'a mut BackupClient, ) -> Result { Ok(Self { checksum_kind: None, @@ -189,7 +192,7 @@ impl<'a> BackupRun<'a> { /// Back up all the roots for this run. pub async fn backup_roots( - &self, + &mut self, config: &ClientConfig, old: &LocalGeneration, newpath: &Path, @@ -236,7 +239,7 @@ impl<'a> BackupRun<'a> { } async fn backup_one_root( - &self, + &mut self, config: &ClientConfig, old: &LocalGeneration, new: &mut NascentGeneration, @@ -287,7 +290,7 @@ impl<'a> BackupRun<'a> { } async fn backup_if_needed( - &self, + &mut self, entry: AnnotatedFsEntry, old: &LocalGeneration, ) -> Result, BackupError> { @@ -322,7 +325,7 @@ impl<'a> BackupRun<'a> { } async fn backup_one_entry( - &self, + &mut self, entry: &AnnotatedFsEntry, path: &Path, reason: Reason, @@ -351,7 +354,7 @@ impl<'a> BackupRun<'a> { /// Upload any file content for a file system entry. pub async fn upload_filesystem_entry( - &self, + &mut self, e: &FilesystemEntry, size: usize, ) -> Result, BackupError> { @@ -370,7 +373,7 @@ impl<'a> BackupRun<'a> { /// Upload the metadata for the backup of this run. pub async fn upload_generation( - &self, + &mut self, filename: &Path, size: usize, ) -> Result { @@ -384,7 +387,7 @@ impl<'a> BackupRun<'a> { } async fn upload_regular_file( - &self, + &mut self, filename: &Path, size: usize, ) -> Result, BackupError> { @@ -407,7 +410,7 @@ impl<'a> BackupRun<'a> { Ok(chunk_ids) } - async fn upload_nascent_generation(&self, filename: &Path) -> Result { + async fn upload_nascent_generation(&mut self, filename: &Path) -> Result { let progress = BackupProgress::upload_generation(); let gen_id = self.upload_generation(filename, SQLITE_CHUNK_SIZE).await?; progress.finish(); diff --git a/src/client.rs b/src/client.rs index 18e1f72..7b15bba 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,8 +12,7 @@ use crate::generation::{FinishedGeneration, GenId, LocalGeneration, LocalGenerat use crate::genlist::GenerationList; use crate::label::Label; -use log::{debug, error, info}; -use std::collections::HashMap; +use log::{error, info}; use std::fs::File; use std::io::prelude::*; use std::path::{Path, PathBuf}; @@ -109,8 +108,6 @@ pub enum ClientError { /// Client for the Obnam server HTTP API. pub struct BackupClient { store: ChunkStore, - client: reqwest::Client, - base_url: String, cipher: CipherEngine, } @@ -118,29 +115,13 @@ impl BackupClient { /// Create a new backup client. pub fn new(config: &ClientConfig) -> Result { info!("creating backup client with config: {:#?}", config); - let pass = config.passwords()?; - - let client = reqwest::Client::builder() - .danger_accept_invalid_certs(!config.verify_tls_cert) - .build() - .map_err(ClientError::ReqwestError)?; Ok(Self { store: ChunkStore::remote(config)?, - client, - base_url: config.server_url.to_string(), cipher: CipherEngine::new(&pass), }) } - fn base_url(&self) -> &str { - &self.base_url - } - - fn chunks_url(&self) -> String { - format!("{}/v1/chunks", self.base_url()) - } - /// Does the server have a chunk? pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result, ClientError> { let mut ids = self.store.find_by_label(meta).await?; @@ -148,26 +129,10 @@ impl BackupClient { } /// Upload a data chunk to the server. - pub async fn upload_chunk(&self, chunk: DataChunk) -> Result { + pub async fn upload_chunk(&mut self, chunk: DataChunk) -> Result { let enc = self.cipher.encrypt_chunk(&chunk)?; - let res = self - .client - .post(&self.chunks_url()) - .header("chunk-meta", chunk.meta().to_json()) - .body(enc.ciphertext().to_vec()) - .send() - .await - .map_err(ClientError::ReqwestError)?; - debug!("upload_chunk: res={:?}", res); - let res: HashMap = res.json().await.map_err(ClientError::ReqwestError)?; - let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { - debug!("upload_chunk: id={}", chunk_id); - chunk_id.parse().unwrap() - } else { - return Err(ClientError::NoCreatedChunkId); - }; - info!("uploaded_chunk {}", chunk_id); - Ok(chunk_id) + let id = self.store.put(enc, chunk.meta()).await?; + Ok(id) } /// Get current client trust chunk from repository, if there is one. diff --git a/src/cmd/backup.rs b/src/cmd/backup.rs index a18027b..70e9eac 100644 --- a/src/cmd/backup.rs +++ b/src/cmd/backup.rs @@ -45,7 +45,7 @@ impl Backup { let major = self.backup_version.unwrap_or(DEFAULT_SCHEMA_MAJOR); let schema = schema_version(major)?; - let client = BackupClient::new(config)?; + let mut client = BackupClient::new(config)?; let trust = client .get_client_trust() .await? @@ -68,7 +68,7 @@ impl Backup { let (is_incremental, outcome) = if let Some(old_id) = old_id { info!("incremental backup based on {}", old_id); - let mut run = BackupRun::incremental(config, &client)?; + let mut run = BackupRun::incremental(config, &mut client)?; let old = run.start(Some(&old_id), &oldtemp, perf).await?; ( true, @@ -77,7 +77,7 @@ impl Backup { ) } else { info!("fresh backup without a previous generation"); - let mut run = BackupRun::initial(config, &client)?; + let mut run = BackupRun::initial(config, &mut client)?; let old = run.start(None, &oldtemp, perf).await?; ( false, -- cgit v1.2.1 From 5cc1dc560ff605a6d582ea0d94ed65b0295cf324 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Fri, 13 May 2022 08:25:34 +0300 Subject: change put to take a vector of bytes Sponsored-by: author --- src/chunkstore.rs | 16 +++++----------- src/client.rs | 3 ++- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/src/chunkstore.rs b/src/chunkstore.rs index 1f67e2f..cb78891 100644 --- a/src/chunkstore.rs +++ b/src/chunkstore.rs @@ -6,7 +6,6 @@ use crate::chunkid::ChunkId; use crate::chunkmeta::ChunkMeta; -use crate::cipher::EncryptedChunk; use crate::config::{ClientConfig, ClientConfigError}; use crate::index::{Index, IndexError}; @@ -50,11 +49,7 @@ impl ChunkStore { /// Store a chunk in the store. /// /// The store chooses an id for the chunk. - pub async fn put( - &mut self, - chunk: EncryptedChunk, - meta: &ChunkMeta, - ) -> Result { + pub async fn put(&mut self, chunk: Vec, meta: &ChunkMeta) -> Result { match self { Self::Local(store) => store.put(chunk, meta), Self::Remote(store) => store.put(chunk, meta).await, @@ -90,7 +85,7 @@ impl LocalStore { .map_err(StoreError::Index) } - fn put(&mut self, chunk: EncryptedChunk, meta: &ChunkMeta) -> Result { + fn put(&mut self, chunk: Vec, meta: &ChunkMeta) -> Result { let id = ChunkId::new(); let (dir, filename) = self.filename(&id); @@ -98,8 +93,7 @@ impl LocalStore { std::fs::create_dir_all(&dir).map_err(|err| StoreError::ChunkMkdir(dir, err))?; } - let data = chunk.ciphertext().to_vec(); - std::fs::write(&filename, &data) + std::fs::write(&filename, &chunk) .map_err(|err| StoreError::WriteChunk(filename.clone(), err))?; self.index .insert_meta(id.clone(), meta.clone()) @@ -162,12 +156,12 @@ impl RemoteStore { Ok(ids) } - async fn put(&self, chunk: EncryptedChunk, meta: &ChunkMeta) -> Result { + async fn put(&self, chunk: Vec, meta: &ChunkMeta) -> Result { let res = self .client .post(&self.chunks_url()) .header("chunk-meta", meta.to_json()) - .body(chunk.ciphertext().to_vec()) + .body(chunk) .send() .await .map_err(StoreError::ReqwestError)?; diff --git a/src/client.rs b/src/client.rs index 7b15bba..7ae6581 100644 --- a/src/client.rs +++ b/src/client.rs @@ -131,7 +131,8 @@ impl BackupClient { /// Upload a data chunk to the server. pub async fn upload_chunk(&mut self, chunk: DataChunk) -> Result { let enc = self.cipher.encrypt_chunk(&chunk)?; - let id = self.store.put(enc, chunk.meta()).await?; + let data = enc.ciphertext().to_vec(); + let id = self.store.put(data, chunk.meta()).await?; Ok(id) } -- cgit v1.2.1 From be9275342cde83ee51840e5859bf9fa0b2cee4eb Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sun, 15 May 2022 09:42:49 +0300 Subject: stash Sponsored-by: author --- src/bin/obnam-server.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs index cfa2cb5..c9e93bc 100644 --- a/src/bin/obnam-server.rs +++ b/src/bin/obnam-server.rs @@ -1,10 +1,9 @@ use anyhow::Context; use clap::Parser; use log::{debug, error, info}; -use obnam::chunk::DataChunk; use obnam::chunkid::ChunkId; use obnam::chunkmeta::ChunkMeta; -use obnam::indexedstore::IndexedStore; +use obnam::chunkstore::ChunkStore; use obnam::server::{ServerConfig, ServerConfigError}; use serde::Serialize; use std::collections::HashMap; @@ -37,7 +36,7 @@ async fn main() -> anyhow::Result<()> { return Err(ServerConfigError::BadServerAddress.into()); } - let store = IndexedStore::new(&config.chunks)?; + let store = ChunkStore::local(&config.chunks)?; let store = Arc::new(Mutex::new(store)); let store = warp::any().map(move || Arc::clone(&store)); @@ -102,7 +101,7 @@ fn load_config(filename: &Path) -> Result { } pub async fn create_chunk( - store: Arc>, + store: Arc>, meta: String, data: Bytes, ) -> Result { @@ -116,9 +115,7 @@ pub async fn create_chunk( } }; - let chunk = DataChunk::new(data.to_vec(), meta); - - let id = match store.save(&chunk) { + let id = match store.put(data.to_vec(), &meta).await { Ok(id) => id, Err(e) => { error!("couldn't save: {}", e); @@ -132,11 +129,11 @@ pub async fn create_chunk( pub async fn fetch_chunk( id: String, - store: Arc>, + store: Arc>, ) -> Result { let store = store.lock().await; let id: ChunkId = id.parse().unwrap(); - match store.load(&id) { + match store.get(&id).await { Ok((data, meta)) => { info!("found chunk {}: {:?}", id, meta); Ok(ChunkResult::Fetched(meta, data)) @@ -233,7 +230,7 @@ pub async fn delete_chunk( enum ChunkResult { Created(ChunkId), - Fetched(ChunkMeta, DataChunk), + Fetched(ChunkMeta, Vec), Found(SearchHits), Deleted, NotFound, @@ -264,7 +261,7 @@ impl warp::Reply for ChunkResult { ); into_response( StatusCode::OK, - chunk.data(), + &chunk, "application/octet-stream", Some(headers), ) -- cgit v1.2.1 From 892df3d488f2df04566f9c19285971f2954d3361 Mon Sep 17 00:00:00 2001 From: Alexander Batischev Date: Fri, 24 Jun 2022 22:40:03 +0300 Subject: Make LocalStore Sync (provide interior mutability) --- src/chunkstore.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/chunkstore.rs b/src/chunkstore.rs index cb78891..85d5007 100644 --- a/src/chunkstore.rs +++ b/src/chunkstore.rs @@ -13,6 +13,7 @@ use log::{debug, error, info}; use reqwest::header::HeaderMap; use std::collections::HashMap; use std::path::{Path, PathBuf}; +use tokio::sync::Mutex; /// A chunk store. /// @@ -41,7 +42,7 @@ impl ChunkStore { /// Does the store have a chunk with a given label? pub async fn find_by_label(&self, meta: &ChunkMeta) -> Result, StoreError> { match self { - Self::Local(store) => store.find_by_label(meta), + Self::Local(store) => store.find_by_label(meta).await, Self::Remote(store) => store.find_by_label(meta).await, } } @@ -49,9 +50,9 @@ impl ChunkStore { /// Store a chunk in the store. /// /// The store chooses an id for the chunk. - pub async fn put(&mut self, chunk: Vec, meta: &ChunkMeta) -> Result { + pub async fn put(&self, chunk: Vec, meta: &ChunkMeta) -> Result { match self { - Self::Local(store) => store.put(chunk, meta), + Self::Local(store) => store.put(chunk, meta).await, Self::Remote(store) => store.put(chunk, meta).await, } } @@ -59,7 +60,7 @@ impl ChunkStore { /// Get a chunk given its id. pub async fn get(&self, id: &ChunkId) -> Result<(Vec, ChunkMeta), StoreError> { match self { - Self::Local(store) => store.get(id), + Self::Local(store) => store.get(id).await, Self::Remote(store) => store.get(id).await, } } @@ -68,24 +69,26 @@ impl ChunkStore { /// A local chunk store. pub struct LocalStore { path: PathBuf, - index: Index, + index: Mutex, } impl LocalStore { fn new(path: &Path) -> Result { Ok(Self { path: path.to_path_buf(), - index: Index::new(path)?, + index: Mutex::new(Index::new(path)?), }) } - fn find_by_label(&self, meta: &ChunkMeta) -> Result, StoreError> { + async fn find_by_label(&self, meta: &ChunkMeta) -> Result, StoreError> { self.index + .lock() + .await .find_by_label(meta.label()) .map_err(StoreError::Index) } - fn put(&mut self, chunk: Vec, meta: &ChunkMeta) -> Result { + async fn put(&self, chunk: Vec, meta: &ChunkMeta) -> Result { let id = ChunkId::new(); let (dir, filename) = self.filename(&id); @@ -96,13 +99,15 @@ impl LocalStore { std::fs::write(&filename, &chunk) .map_err(|err| StoreError::WriteChunk(filename.clone(), err))?; self.index + .lock() + .await .insert_meta(id.clone(), meta.clone()) .map_err(StoreError::Index)?; Ok(id) } - fn get(&self, id: &ChunkId) -> Result<(Vec, ChunkMeta), StoreError> { - let meta = self.index.get_meta(id)?; + async fn get(&self, id: &ChunkId) -> Result<(Vec, ChunkMeta), StoreError> { + let meta = self.index.lock().await.get_meta(id)?; let (_, filename) = &self.filename(id); -- cgit v1.2.1 From 14ab788c930846aeb2c472b421a9030b0e9e95f3 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Wed, 26 Oct 2022 17:33:52 +0300 Subject: feat! finish chunk store abstraction This builds on Alexander's work to show me how to get past the problem I had. There's additional changes to finish off the changes. Drop chunk deletion from server: it's not a good idea to have it until the server API is authenticated. Sponsored-by: author --- obnam.md | 26 -------------------------- src/bin/obnam-server.rs | 45 +++++++++++---------------------------------- src/chunkstore.rs | 2 +- subplot/server.py | 6 ++++-- 4 files changed, 16 insertions(+), 63 deletions(-) diff --git a/obnam.md b/obnam.md index 0c36a8b..ef6daea 100644 --- a/obnam.md +++ b/obnam.md @@ -1259,21 +1259,6 @@ and content-type is application/json and the JSON body matches {"":{"label":"0abc"}} ~~~ -Finally, we must be able to delete it. After that, we must not be able -to retrieve it, or find it using metadata. - -~~~scenario -when I DELETE /v1/chunks/ -then HTTP status code is 200 - -when I GET /v1/chunks/ -then HTTP status code is 404 - -when I GET /v1/chunks?label=0abc -then HTTP status code is 200 -and content-type is application/json -and the JSON body matches {} -~~~ ## Retrieve a chunk that does not exist @@ -1298,17 +1283,6 @@ and content-type is application/json and the JSON body matches {} ~~~ -## Delete chunk that does not exist - -We must get the right error when deleting a chunk that doesn't exist. - -~~~scenario -given a working Obnam system -when I try to DELETE /v1/chunks/any.random.string -then HTTP status code is 404 -~~~ - - ## Persistent across restarts Chunk storage, and the index of chunk metadata for searches, needs to diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs index c9e93bc..102e8b6 100644 --- a/src/bin/obnam-server.rs +++ b/src/bin/obnam-server.rs @@ -4,6 +4,7 @@ use log::{debug, error, info}; use obnam::chunkid::ChunkId; use obnam::chunkmeta::ChunkMeta; use obnam::chunkstore::ChunkStore; +use obnam::label::Label; use obnam::server::{ServerConfig, ServerConfigError}; use serde::Serialize; use std::collections::HashMap; @@ -69,16 +70,8 @@ async fn main() -> anyhow::Result<()> { .and(store.clone()) .and_then(search_chunks); - let delete = warp::delete() - .and(warp::path("v1")) - .and(warp::path("chunks")) - .and(warp::path::param()) - .and(warp::path::end()) - .and(store.clone()) - .and_then(delete_chunk); - let log = warp::log("obnam"); - let webroot = create.or(fetch).or(search).or(delete).with(log); + let webroot = create.or(fetch).or(search).with(log); debug!("starting warp"); warp::serve(webroot) @@ -105,7 +98,7 @@ pub async fn create_chunk( meta: String, data: Bytes, ) -> Result { - let mut store = store.lock().await; + let store = store.lock().await; let meta: ChunkMeta = match meta.parse() { Ok(s) => s, @@ -147,7 +140,7 @@ pub async fn fetch_chunk( pub async fn search_chunks( query: HashMap, - store: Arc>, + store: Arc>, ) -> Result { let store = store.lock().await; @@ -158,7 +151,12 @@ pub async fn search_chunks( return Ok(ChunkResult::BadRequest); } if key == "label" { - store.find_by_label(value).expect("SQL lookup failed") + let label = Label::deserialize(value).unwrap(); + let label = ChunkMeta::new(&label); + store + .find_by_label(&label) + .await + .expect("SQL lookup failed") } else { error!("unknown search key {:?}", key); return Ok(ChunkResult::BadRequest); @@ -170,7 +168,7 @@ pub async fn search_chunks( let mut hits = SearchHits::default(); for chunk_id in found { - let meta = match store.load_meta(&chunk_id) { + let (_, meta) = match store.get(&chunk_id).await { Ok(meta) => { info!("search found chunk {}", chunk_id); meta @@ -209,30 +207,10 @@ impl SearchHits { } } -pub async fn delete_chunk( - id: String, - store: Arc>, -) -> Result { - let mut store = store.lock().await; - let id: ChunkId = id.parse().unwrap(); - - match store.remove(&id) { - Ok(_) => { - info!("chunk deleted: {}", id); - Ok(ChunkResult::Deleted) - } - Err(e) => { - error!("could not delete chunk {}: {:?}", id, e); - Ok(ChunkResult::NotFound) - } - } -} - enum ChunkResult { Created(ChunkId), Fetched(ChunkMeta, Vec), Found(SearchHits), - Deleted, NotFound, BadRequest, InternalServerError, @@ -267,7 +245,6 @@ impl warp::Reply for ChunkResult { ) } ChunkResult::Found(hits) => json_response(StatusCode::OK, hits.to_json(), None), - ChunkResult::Deleted => status_response(StatusCode::OK), ChunkResult::BadRequest => status_response(StatusCode::BAD_REQUEST), ChunkResult::NotFound => status_response(StatusCode::NOT_FOUND), ChunkResult::InternalServerError => status_response(StatusCode::INTERNAL_SERVER_ERROR), diff --git a/src/chunkstore.rs b/src/chunkstore.rs index 85d5007..2b93720 100644 --- a/src/chunkstore.rs +++ b/src/chunkstore.rs @@ -124,7 +124,7 @@ impl LocalStore { let b = bytes[1]; let c = bytes[2]; let dir = self.path.join(format!("{}/{}/{}", a, b, c)); - let filename = dir.join(format!("{}", id)); + let filename = dir.join(format!("{}.data", id)); (dir, filename) } } diff --git a/subplot/server.py b/subplot/server.py index 1f4506f..a604733 100644 --- a/subplot/server.py +++ b/subplot/server.py @@ -87,7 +87,9 @@ def delete_chunk_by_id(ctx, chunk_id=None): def make_chunk_file_be_empty(ctx, chunk_id=None): chunk_id = ctx["vars"][chunk_id] chunks = ctx["config"]["chunks"] - for (dirname, _, _) in os.walk(chunks): + logging.debug(f"trying to empty chunk {chunk_id}") + for (dirname, _, filenames) in os.walk(chunks): + logging.debug(f"found directory {dirname}, with {filenames}") filename = os.path.join(dirname, chunk_id + ".data") if os.path.exists(filename): logging.debug(f"emptying chunk file {filename}") @@ -136,7 +138,7 @@ def server_has_n_chunks(ctx, n=None): assert_eq = globals()["assert_eq"] n = int(n) files = find_files(ctx["config"]["chunks"]) - files = [json.load(open(x)) for x in files if x.endswith(".meta")] + files = [x for x in files if x.endswith(".data")] logging.debug(f"server_has_n_file_chunks: n={n}") logging.debug(f"server_has_n_file_chunks: len(files)={len(files)}") logging.debug(f"server_has_n_file_chunks: files={files}") -- cgit v1.2.1