diff options
Diffstat (limited to 'src/bin/obnam-server.rs')
-rw-r--r-- | src/bin/obnam-server.rs | 92 |
1 files changed, 31 insertions, 61 deletions
diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs index 520d6a9..cf53d2f 100644 --- a/src/bin/obnam-server.rs +++ b/src/bin/obnam-server.rs @@ -1,6 +1,9 @@ use bytes::Bytes; use log::{debug, error, info}; -use obnam::{chunk::DataChunk, chunkid::ChunkId, chunkmeta::ChunkMeta, index::Index, store::Store}; +use obnam::chunk::DataChunk; +use obnam::chunkid::ChunkId; +use obnam::chunkmeta::ChunkMeta; +use obnam::indexedstore::IndexedStore; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::default::Default; @@ -24,21 +27,18 @@ async fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); let config = Config::read_config(&opt.config).unwrap(); - let config_bare = config.clone(); - let config = Arc::new(Mutex::new(config)); - let config = warp::any().map(move || Arc::clone(&config)); - let index = Arc::new(Mutex::new(Index::default())); - let index = warp::any().map(move || Arc::clone(&index)); + let store = IndexedStore::new(&config.chunks); + let store = Arc::new(Mutex::new(store)); + let store = warp::any().map(move || Arc::clone(&store)); info!("Obnam server starting up"); debug!("opt: {:#?}", opt); - debug!("Configuration: {:#?}", config_bare); + debug!("Configuration: {:#?}", config); let create = warp::post() .and(warp::path("chunks")) - .and(config.clone()) - .and(index.clone()) + .and(store.clone()) .and(warp::header("chunk-meta")) .and(warp::filters::body::bytes()) .and_then(create_chunk); @@ -46,21 +46,19 @@ async fn main() -> anyhow::Result<()> { let fetch = warp::get() .and(warp::path("chunks")) .and(warp::path::param()) - .and(config.clone()) + .and(store.clone()) .and_then(fetch_chunk); let search = warp::get() .and(warp::path("chunks")) .and(warp::query::<HashMap<String, String>>()) - .and(config.clone()) - .and(index.clone()) + .and(store.clone()) .and_then(search_chunks); let delete = warp::delete() .and(warp::path("chunks")) .and(warp::path::param()) - .and(config.clone()) - .and(index.clone()) + .and(store.clone()) .and_then(delete_chunk); let log = warp::log("obnam"); @@ -71,7 +69,7 @@ async fn main() -> anyhow::Result<()> { // .tls() // .key_path(config_bare.tls_key) // .cert_path(config_bare.tls_cert) - .run(([127, 0, 0, 1], config_bare.port)) + .run(([127, 0, 0, 1], config.port)) .await; Ok(()) } @@ -125,14 +123,11 @@ impl Config { } pub async fn create_chunk( - config: Arc<Mutex<Config>>, - index: Arc<Mutex<Index>>, + store: Arc<Mutex<IndexedStore>>, meta: String, data: Bytes, ) -> Result<impl warp::Reply, warp::Rejection> { - let id = ChunkId::new(); - let config = config.lock().await; - let store = Store::new(&config.chunks); + let mut store = store.lock().await; let meta: ChunkMeta = match meta.parse() { Ok(s) => s, @@ -144,19 +139,13 @@ pub async fn create_chunk( let chunk = DataChunk::new(data.to_vec()); - match store.save(&id, &meta, &chunk) { - Ok(_) => (), + let id = match store.save(&meta, &chunk) { + Ok(id) => id, Err(e) => { - error!("could not write chunk to disk: {}", e); + error!("couldn't save: {}", e); return Ok(ChunkResult::InternalServerError); } - } - - let mut index = index.lock().await; - index.insert(id.clone(), "sha256", meta.sha256()); - if meta.is_generation() { - index.insert_generation(id.clone()); - } + }; info!("created chunk {}: {:?}", id, meta); Ok(ChunkResult::Created(id)) @@ -164,10 +153,9 @@ pub async fn create_chunk( pub async fn fetch_chunk( id: String, - config: Arc<Mutex<Config>>, + store: Arc<Mutex<IndexedStore>>, ) -> Result<impl warp::Reply, warp::Rejection> { - let config = config.lock().await; - let store = Store::new(&config.chunks); + let store = store.lock().await; let id: ChunkId = id.parse().unwrap(); match store.load(&id) { Ok((meta, chunk)) => { @@ -183,13 +171,9 @@ pub async fn fetch_chunk( pub async fn search_chunks( query: HashMap<String, String>, - config: Arc<Mutex<Config>>, - index: Arc<Mutex<Index>>, + store: Arc<Mutex<IndexedStore>>, ) -> Result<impl warp::Reply, warp::Rejection> { - let index = index.lock().await; - let config = config.lock().await; - - let store = Store::new(&config.chunks); + let store = store.lock().await; let mut query = query.iter(); let found = if let Some((key, value)) = query.next() { @@ -198,9 +182,12 @@ pub async fn search_chunks( return Ok(ChunkResult::BadRequest); } if key == "generation" && value == "true" { - index.find_generations() + store.find_generations() + } else if key == "sha256" { + store.find_by_sha256(value) } else { - index.find(&key, &value) + error!("unknown search key {:?}", key); + return Ok(ChunkResult::BadRequest); } } else { error!("search has no key to search for"); @@ -250,29 +237,12 @@ impl SearchHits { pub async fn delete_chunk( id: String, - config: Arc<Mutex<Config>>, - index: Arc<Mutex<Index>>, + store: Arc<Mutex<IndexedStore>>, ) -> Result<impl warp::Reply, warp::Rejection> { - let config = config.lock().await; - let mut index = index.lock().await; - let store = Store::new(&config.chunks); + let mut store = store.lock().await; let id: ChunkId = id.parse().unwrap(); - let (meta, _) = match store.load(&id) { - Ok((meta, chunk)) => { - debug!("found chunk to delete: {}", id); - (meta, chunk) - } - Err(e) => { - error!("could not find chunk to delete: {}: {:?}", id, e); - return Ok(ChunkResult::NotFound); - } - }; - - index.remove("sha256", meta.sha256()); - index.remove_generation(&id); - - match store.delete(&id) { + match store.remove(&id) { Ok(_) => { info!("chunk deleted: {}", id); Ok(ChunkResult::Deleted) |