summaryrefslogtreecommitdiff
path: root/src/bin/obnam-server.rs
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2020-09-19 08:12:54 +0300
committerLars Wirzenius <liw@liw.fi>2020-09-19 10:13:49 +0300
commit70b8d232c2b7e2892a117f61e3a9892bf4994ea4 (patch)
tree691804e9e8a63d1f5bb6750a20a0d3c0cd6fa6f7 /src/bin/obnam-server.rs
parente1c4683b73ec2a207321377636f4ed722d0674dc (diff)
downloadobnam2-70b8d232c2b7e2892a117f61e3a9892bf4994ea4.tar.gz
feat: search, delete chunks on chunk server
Also heavily refactor the now-long scenario by splitting out a happy path and some unhappy paths.
Diffstat (limited to 'src/bin/obnam-server.rs')
-rw-r--r--src/bin/obnam-server.rs141
1 files changed, 131 insertions, 10 deletions
diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs
index 3adf2eb..c81e67a 100644
--- a/src/bin/obnam-server.rs
+++ b/src/bin/obnam-server.rs
@@ -1,6 +1,7 @@
use bytes::Bytes;
use obnam::{chunk::Chunk, chunkid::ChunkId, chunkmeta::ChunkMeta, index::Index, store::Store};
use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
use std::default::Default;
use std::path::{Path, PathBuf};
use std::sync::Arc;
@@ -41,15 +42,21 @@ async fn main() -> anyhow::Result<()> {
.and(config.clone())
.and_then(fetch_chunk);
- // let search = warp::get()
- // .and(warp::path("chunks"))
- // .and(warp::query::<HashMap<String, String>>())
- // .and(config.clone())
- // .and(index.clone())
- // .and_then(obnam::routes::search::search_chunks);
+ let search = warp::get()
+ .and(warp::path("chunks"))
+ .and(warp::query::<HashMap<String, String>>())
+ .and(config.clone())
+ .and(index.clone())
+ .and_then(search_chunks);
- // let webroot = create.or(fetch).or(search);
- let webroot = create.or(fetch);
+ let delete = warp::delete()
+ .and(warp::path("chunks"))
+ .and(warp::path::param())
+ .and(config.clone())
+ .and(index.clone())
+ .and_then(delete_chunk);
+
+ let webroot = create.or(fetch).or(search).or(delete);
warp::serve(webroot)
.tls()
.key_path(config_bare.tls_key)
@@ -151,15 +158,110 @@ pub async fn fetch_chunk(
let config = config.lock().await;
let store = Store::new(&config.chunks);
let id: ChunkId = id.parse().unwrap();
+ eprintln!("fetch: {:?}", id);
match store.load(&id) {
- Ok(chunk) => Ok(ChunkResult::Fetched(chunk)),
- Err(_) => Err(warp::reject::not_found()),
+ Ok(chunk) => {
+ eprintln!("loaded: {:?}", chunk.meta());
+ Ok(ChunkResult::Fetched(chunk))
+ }
+ Err(e) => {
+ eprintln!("error loading: {:?}", e);
+ Ok(ChunkResult::NotFound)
+ }
+ }
+}
+
+pub async fn search_chunks(
+ query: HashMap<String, String>,
+ config: Arc<Mutex<Config>>,
+ index: Arc<Mutex<Index>>,
+) -> Result<impl warp::Reply, warp::Rejection> {
+ let index = index.lock().await;
+ let config = config.lock().await;
+
+ let store = Store::new(&config.chunks);
+
+ let mut query = query.iter();
+ let found = if let Some((key, value)) = query.next() {
+ if query.next() != None {
+ return Ok(ChunkResult::BadRequest);
+ }
+ index.find(&key, &value)
+ } else {
+ return Ok(ChunkResult::BadRequest);
+ };
+
+ let mut hits = SearchHits::default();
+ for chunk_id in found {
+ let meta = match store.load_meta(&chunk_id) {
+ Ok(meta) => meta,
+ Err(_) => return Ok(ChunkResult::InternalServerError),
+ };
+ hits.insert(&chunk_id, meta);
+ }
+
+ Ok(ChunkResult::Found(hits))
+}
+
+#[derive(Default, Clone, Serialize)]
+struct SearchHits {
+ map: HashMap<String, ChunkMeta>,
+}
+
+impl SearchHits {
+ fn insert(&mut self, chunk_id: &ChunkId, meta: ChunkMeta) {
+ self.map.insert(chunk_id.to_string(), meta);
+ }
+
+ fn to_json(&self) -> String {
+ serde_json::to_string(&self.map).unwrap()
+ }
+}
+
+pub async fn delete_chunk(
+ id: String,
+ config: Arc<Mutex<Config>>,
+ index: Arc<Mutex<Index>>,
+) -> Result<impl warp::Reply, warp::Rejection> {
+ let config = config.lock().await;
+ let mut index = index.lock().await;
+ let store = Store::new(&config.chunks);
+ let id: ChunkId = id.parse().unwrap();
+
+ eprintln!("delete: {:?}", id);
+ let chunk = match store.load(&id) {
+ Ok(chunk) => {
+ eprintln!("loaded: {:?}", chunk.meta());
+ chunk
+ }
+ Err(e) => {
+ eprintln!("error loading: {:?}", e);
+ return Ok(ChunkResult::NotFound);
+ }
+ };
+
+ let meta = chunk.meta();
+ index.remove("sha256", meta.sha256());
+ index.remove_generation(&id);
+
+ match store.delete(&id) {
+ Ok(_) => {
+ eprintln!("deleted: {:?}", id);
+ Ok(ChunkResult::Deleted)
+ }
+ Err(e) => {
+ eprintln!("error deleting: {:?}", e);
+ Ok(ChunkResult::NotFound)
+ }
}
}
enum ChunkResult {
Created(ChunkId),
Fetched(Chunk),
+ Found(SearchHits),
+ Deleted,
+ NotFound,
BadRequest,
InternalServerError,
}
@@ -201,6 +303,20 @@ impl warp::Reply for ChunkResult {
*r.status_mut() = StatusCode::OK;
r
}
+ ChunkResult::Found(hits) => {
+ let mut r = warp::reply::Response::new(hits.to_json().into());
+ r.headers_mut().insert(
+ warp::http::header::CONTENT_TYPE,
+ warp::http::header::HeaderValue::from_static("application/json"),
+ );
+ *r.status_mut() = StatusCode::OK;
+ r
+ }
+ ChunkResult::Deleted => {
+ let mut r = warp::reply::Response::new("".into());
+ *r.status_mut() = StatusCode::OK;
+ r
+ }
ChunkResult::BadRequest => {
let mut r = warp::reply::Response::new("".into());
r.headers_mut().insert(
@@ -210,6 +326,11 @@ impl warp::Reply for ChunkResult {
*r.status_mut() = StatusCode::BAD_REQUEST;
r
}
+ ChunkResult::NotFound => {
+ let mut r = warp::reply::Response::new("".into());
+ *r.status_mut() = StatusCode::NOT_FOUND;
+ r
+ }
ChunkResult::InternalServerError => {
let mut r = warp::reply::Response::new("".into());
r.headers_mut().insert(