From cf18e86c351ddbaf655d02e66c1666cca4806947 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Tue, 24 Nov 2020 08:35:32 +0200 Subject: refactor: add an abstraction for an indexed store This makes it easier to write a server without the HTTP layer. --- src/bin/obnam-server.rs | 92 +++++++++++++++++-------------------------------- src/indexedstore.rs | 57 ++++++++++++++++++++++++++++++ src/lib.rs | 1 + 3 files changed, 89 insertions(+), 61 deletions(-) create mode 100644 src/indexedstore.rs diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs index 520d6a9..cf53d2f 100644 --- a/src/bin/obnam-server.rs +++ b/src/bin/obnam-server.rs @@ -1,6 +1,9 @@ use bytes::Bytes; use log::{debug, error, info}; -use obnam::{chunk::DataChunk, chunkid::ChunkId, chunkmeta::ChunkMeta, index::Index, store::Store}; +use obnam::chunk::DataChunk; +use obnam::chunkid::ChunkId; +use obnam::chunkmeta::ChunkMeta; +use obnam::indexedstore::IndexedStore; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::default::Default; @@ -24,21 +27,18 @@ async fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); let config = Config::read_config(&opt.config).unwrap(); - let config_bare = config.clone(); - let config = Arc::new(Mutex::new(config)); - let config = warp::any().map(move || Arc::clone(&config)); - let index = Arc::new(Mutex::new(Index::default())); - let index = warp::any().map(move || Arc::clone(&index)); + let store = IndexedStore::new(&config.chunks); + let store = Arc::new(Mutex::new(store)); + let store = warp::any().map(move || Arc::clone(&store)); info!("Obnam server starting up"); debug!("opt: {:#?}", opt); - debug!("Configuration: {:#?}", config_bare); + debug!("Configuration: {:#?}", config); let create = warp::post() .and(warp::path("chunks")) - .and(config.clone()) - .and(index.clone()) + .and(store.clone()) .and(warp::header("chunk-meta")) .and(warp::filters::body::bytes()) .and_then(create_chunk); @@ -46,21 +46,19 @@ async fn main() -> anyhow::Result<()> { let fetch = warp::get() .and(warp::path("chunks")) .and(warp::path::param()) - .and(config.clone()) + .and(store.clone()) .and_then(fetch_chunk); let search = warp::get() .and(warp::path("chunks")) .and(warp::query::>()) - .and(config.clone()) - .and(index.clone()) + .and(store.clone()) .and_then(search_chunks); let delete = warp::delete() .and(warp::path("chunks")) .and(warp::path::param()) - .and(config.clone()) - .and(index.clone()) + .and(store.clone()) .and_then(delete_chunk); let log = warp::log("obnam"); @@ -71,7 +69,7 @@ async fn main() -> anyhow::Result<()> { // .tls() // .key_path(config_bare.tls_key) // .cert_path(config_bare.tls_cert) - .run(([127, 0, 0, 1], config_bare.port)) + .run(([127, 0, 0, 1], config.port)) .await; Ok(()) } @@ -125,14 +123,11 @@ impl Config { } pub async fn create_chunk( - config: Arc>, - index: Arc>, + store: Arc>, meta: String, data: Bytes, ) -> Result { - let id = ChunkId::new(); - let config = config.lock().await; - let store = Store::new(&config.chunks); + let mut store = store.lock().await; let meta: ChunkMeta = match meta.parse() { Ok(s) => s, @@ -144,19 +139,13 @@ pub async fn create_chunk( let chunk = DataChunk::new(data.to_vec()); - match store.save(&id, &meta, &chunk) { - Ok(_) => (), + let id = match store.save(&meta, &chunk) { + Ok(id) => id, Err(e) => { - error!("could not write chunk to disk: {}", e); + error!("couldn't save: {}", e); return Ok(ChunkResult::InternalServerError); } - } - - let mut index = index.lock().await; - index.insert(id.clone(), "sha256", meta.sha256()); - if meta.is_generation() { - index.insert_generation(id.clone()); - } + }; info!("created chunk {}: {:?}", id, meta); Ok(ChunkResult::Created(id)) @@ -164,10 +153,9 @@ pub async fn create_chunk( pub async fn fetch_chunk( id: String, - config: Arc>, + store: Arc>, ) -> Result { - let config = config.lock().await; - let store = Store::new(&config.chunks); + let store = store.lock().await; let id: ChunkId = id.parse().unwrap(); match store.load(&id) { Ok((meta, chunk)) => { @@ -183,13 +171,9 @@ pub async fn fetch_chunk( pub async fn search_chunks( query: HashMap, - config: Arc>, - index: Arc>, + store: Arc>, ) -> Result { - let index = index.lock().await; - let config = config.lock().await; - - let store = Store::new(&config.chunks); + let store = store.lock().await; let mut query = query.iter(); let found = if let Some((key, value)) = query.next() { @@ -198,9 +182,12 @@ pub async fn search_chunks( return Ok(ChunkResult::BadRequest); } if key == "generation" && value == "true" { - index.find_generations() + store.find_generations() + } else if key == "sha256" { + store.find_by_sha256(value) } else { - index.find(&key, &value) + error!("unknown search key {:?}", key); + return Ok(ChunkResult::BadRequest); } } else { error!("search has no key to search for"); @@ -250,29 +237,12 @@ impl SearchHits { pub async fn delete_chunk( id: String, - config: Arc>, - index: Arc>, + store: Arc>, ) -> Result { - let config = config.lock().await; - let mut index = index.lock().await; - let store = Store::new(&config.chunks); + let mut store = store.lock().await; let id: ChunkId = id.parse().unwrap(); - let (meta, _) = match store.load(&id) { - Ok((meta, chunk)) => { - debug!("found chunk to delete: {}", id); - (meta, chunk) - } - Err(e) => { - error!("could not find chunk to delete: {}: {:?}", id, e); - return Ok(ChunkResult::NotFound); - } - }; - - index.remove("sha256", meta.sha256()); - index.remove_generation(&id); - - match store.delete(&id) { + match store.remove(&id) { Ok(_) => { info!("chunk deleted: {}", id); Ok(ChunkResult::Deleted) diff --git a/src/indexedstore.rs b/src/indexedstore.rs new file mode 100644 index 0000000..4cc90cc --- /dev/null +++ b/src/indexedstore.rs @@ -0,0 +1,57 @@ +use crate::chunk::DataChunk; +use crate::chunkid::ChunkId; +use crate::chunkmeta::ChunkMeta; +use crate::index::Index; +use crate::store::Store; +use std::path::Path; + +/// A store for chunks and their metadata. +/// +/// This combines Store and Index into one interface to make it easier +/// to handle the server side storage of chunks. +pub struct IndexedStore { + store: Store, + index: Index, +} + +impl IndexedStore { + pub fn new(dirname: &Path) -> Self { + let store = Store::new(dirname); + let index = Index::default(); + Self { store, index } + } + + pub fn save(&mut self, meta: &ChunkMeta, chunk: &DataChunk) -> anyhow::Result { + let id = ChunkId::new(); + self.store.save(&id, meta, chunk)?; + self.index.insert(id.clone(), "sha256", meta.sha256()); + if meta.is_generation() { + self.index.insert_generation(id.clone()); + } + Ok(id) + } + + pub fn load(&self, id: &ChunkId) -> anyhow::Result<(ChunkMeta, DataChunk)> { + self.store.load(id) + } + + pub fn load_meta(&self, id: &ChunkId) -> anyhow::Result { + self.store.load_meta(id) + } + + pub fn find_by_sha256(&self, sha256: &str) -> Vec { + self.index.find("sha256", sha256) + } + + pub fn find_generations(&self) -> Vec { + self.index.find_generations() + } + + pub fn remove(&mut self, id: &ChunkId) -> anyhow::Result<()> { + let (meta, _) = self.store.load(id)?; + self.index.remove("sha256", meta.sha256()); + self.index.remove_generation(id); + self.store.delete(id)?; + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 3d20797..bdc790f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,5 +9,6 @@ pub mod fsentry; pub mod fsiter; pub mod generation; pub mod index; +pub mod indexedstore; pub mod server; pub mod store; -- cgit v1.2.1