From 70b8d232c2b7e2892a117f61e3a9892bf4994ea4 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sat, 19 Sep 2020 08:12:54 +0300 Subject: feat: search, delete chunks on chunk server Also heavily refactor the now-long scenario by splitting out a happy path and some unhappy paths. --- src/bin/obnam-server.rs | 141 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 131 insertions(+), 10 deletions(-) (limited to 'src/bin/obnam-server.rs') diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs index 3adf2eb..c81e67a 100644 --- a/src/bin/obnam-server.rs +++ b/src/bin/obnam-server.rs @@ -1,6 +1,7 @@ use bytes::Bytes; use obnam::{chunk::Chunk, chunkid::ChunkId, chunkmeta::ChunkMeta, index::Index, store::Store}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::default::Default; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -41,15 +42,21 @@ async fn main() -> anyhow::Result<()> { .and(config.clone()) .and_then(fetch_chunk); - // let search = warp::get() - // .and(warp::path("chunks")) - // .and(warp::query::>()) - // .and(config.clone()) - // .and(index.clone()) - // .and_then(obnam::routes::search::search_chunks); + let search = warp::get() + .and(warp::path("chunks")) + .and(warp::query::>()) + .and(config.clone()) + .and(index.clone()) + .and_then(search_chunks); - // let webroot = create.or(fetch).or(search); - let webroot = create.or(fetch); + let delete = warp::delete() + .and(warp::path("chunks")) + .and(warp::path::param()) + .and(config.clone()) + .and(index.clone()) + .and_then(delete_chunk); + + let webroot = create.or(fetch).or(search).or(delete); warp::serve(webroot) .tls() .key_path(config_bare.tls_key) @@ -151,15 +158,110 @@ pub async fn fetch_chunk( let config = config.lock().await; let store = Store::new(&config.chunks); let id: ChunkId = id.parse().unwrap(); + eprintln!("fetch: {:?}", id); match store.load(&id) { - Ok(chunk) => Ok(ChunkResult::Fetched(chunk)), - Err(_) => Err(warp::reject::not_found()), + Ok(chunk) => { + eprintln!("loaded: {:?}", chunk.meta()); + Ok(ChunkResult::Fetched(chunk)) + } + Err(e) => { + eprintln!("error loading: {:?}", e); + Ok(ChunkResult::NotFound) + } + } +} + +pub async fn search_chunks( + query: HashMap, + config: Arc>, + index: Arc>, +) -> Result { + let index = index.lock().await; + let config = config.lock().await; + + let store = Store::new(&config.chunks); + + let mut query = query.iter(); + let found = if let Some((key, value)) = query.next() { + if query.next() != None { + return Ok(ChunkResult::BadRequest); + } + index.find(&key, &value) + } else { + return Ok(ChunkResult::BadRequest); + }; + + let mut hits = SearchHits::default(); + for chunk_id in found { + let meta = match store.load_meta(&chunk_id) { + Ok(meta) => meta, + Err(_) => return Ok(ChunkResult::InternalServerError), + }; + hits.insert(&chunk_id, meta); + } + + Ok(ChunkResult::Found(hits)) +} + +#[derive(Default, Clone, Serialize)] +struct SearchHits { + map: HashMap, +} + +impl SearchHits { + fn insert(&mut self, chunk_id: &ChunkId, meta: ChunkMeta) { + self.map.insert(chunk_id.to_string(), meta); + } + + fn to_json(&self) -> String { + serde_json::to_string(&self.map).unwrap() + } +} + +pub async fn delete_chunk( + id: String, + config: Arc>, + index: Arc>, +) -> Result { + let config = config.lock().await; + let mut index = index.lock().await; + let store = Store::new(&config.chunks); + let id: ChunkId = id.parse().unwrap(); + + eprintln!("delete: {:?}", id); + let chunk = match store.load(&id) { + Ok(chunk) => { + eprintln!("loaded: {:?}", chunk.meta()); + chunk + } + Err(e) => { + eprintln!("error loading: {:?}", e); + return Ok(ChunkResult::NotFound); + } + }; + + let meta = chunk.meta(); + index.remove("sha256", meta.sha256()); + index.remove_generation(&id); + + match store.delete(&id) { + Ok(_) => { + eprintln!("deleted: {:?}", id); + Ok(ChunkResult::Deleted) + } + Err(e) => { + eprintln!("error deleting: {:?}", e); + Ok(ChunkResult::NotFound) + } } } enum ChunkResult { Created(ChunkId), Fetched(Chunk), + Found(SearchHits), + Deleted, + NotFound, BadRequest, InternalServerError, } @@ -201,6 +303,20 @@ impl warp::Reply for ChunkResult { *r.status_mut() = StatusCode::OK; r } + ChunkResult::Found(hits) => { + let mut r = warp::reply::Response::new(hits.to_json().into()); + r.headers_mut().insert( + warp::http::header::CONTENT_TYPE, + warp::http::header::HeaderValue::from_static("application/json"), + ); + *r.status_mut() = StatusCode::OK; + r + } + ChunkResult::Deleted => { + let mut r = warp::reply::Response::new("".into()); + *r.status_mut() = StatusCode::OK; + r + } ChunkResult::BadRequest => { let mut r = warp::reply::Response::new("".into()); r.headers_mut().insert( @@ -210,6 +326,11 @@ impl warp::Reply for ChunkResult { *r.status_mut() = StatusCode::BAD_REQUEST; r } + ChunkResult::NotFound => { + let mut r = warp::reply::Response::new("".into()); + *r.status_mut() = StatusCode::NOT_FOUND; + r + } ChunkResult::InternalServerError => { let mut r = warp::reply::Response::new("".into()); r.headers_mut().insert( -- cgit v1.2.1