summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2020-09-19 08:12:54 +0300
committerLars Wirzenius <liw@liw.fi>2020-09-19 10:13:49 +0300
commit70b8d232c2b7e2892a117f61e3a9892bf4994ea4 (patch)
tree691804e9e8a63d1f5bb6750a20a0d3c0cd6fa6f7 /src
parente1c4683b73ec2a207321377636f4ed722d0674dc (diff)
downloadobnam2-70b8d232c2b7e2892a117f61e3a9892bf4994ea4.tar.gz
feat: search, delete chunks on chunk server
Also heavily refactor the now-long scenario by splitting out a happy path and some unhappy paths.
Diffstat (limited to 'src')
-rw-r--r--src/bin/obnam-server.rs141
-rw-r--r--src/index.rs33
-rw-r--r--src/store.rs7
3 files changed, 171 insertions, 10 deletions
diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs
index 3adf2eb..c81e67a 100644
--- a/src/bin/obnam-server.rs
+++ b/src/bin/obnam-server.rs
@@ -1,6 +1,7 @@
use bytes::Bytes;
use obnam::{chunk::Chunk, chunkid::ChunkId, chunkmeta::ChunkMeta, index::Index, store::Store};
use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
use std::default::Default;
use std::path::{Path, PathBuf};
use std::sync::Arc;
@@ -41,15 +42,21 @@ async fn main() -> anyhow::Result<()> {
.and(config.clone())
.and_then(fetch_chunk);
- // let search = warp::get()
- // .and(warp::path("chunks"))
- // .and(warp::query::<HashMap<String, String>>())
- // .and(config.clone())
- // .and(index.clone())
- // .and_then(obnam::routes::search::search_chunks);
+ let search = warp::get()
+ .and(warp::path("chunks"))
+ .and(warp::query::<HashMap<String, String>>())
+ .and(config.clone())
+ .and(index.clone())
+ .and_then(search_chunks);
- // let webroot = create.or(fetch).or(search);
- let webroot = create.or(fetch);
+ let delete = warp::delete()
+ .and(warp::path("chunks"))
+ .and(warp::path::param())
+ .and(config.clone())
+ .and(index.clone())
+ .and_then(delete_chunk);
+
+ let webroot = create.or(fetch).or(search).or(delete);
warp::serve(webroot)
.tls()
.key_path(config_bare.tls_key)
@@ -151,15 +158,110 @@ pub async fn fetch_chunk(
let config = config.lock().await;
let store = Store::new(&config.chunks);
let id: ChunkId = id.parse().unwrap();
+ eprintln!("fetch: {:?}", id);
match store.load(&id) {
- Ok(chunk) => Ok(ChunkResult::Fetched(chunk)),
- Err(_) => Err(warp::reject::not_found()),
+ Ok(chunk) => {
+ eprintln!("loaded: {:?}", chunk.meta());
+ Ok(ChunkResult::Fetched(chunk))
+ }
+ Err(e) => {
+ eprintln!("error loading: {:?}", e);
+ Ok(ChunkResult::NotFound)
+ }
+ }
+}
+
+pub async fn search_chunks(
+ query: HashMap<String, String>,
+ config: Arc<Mutex<Config>>,
+ index: Arc<Mutex<Index>>,
+) -> Result<impl warp::Reply, warp::Rejection> {
+ let index = index.lock().await;
+ let config = config.lock().await;
+
+ let store = Store::new(&config.chunks);
+
+ let mut query = query.iter();
+ let found = if let Some((key, value)) = query.next() {
+ if query.next() != None {
+ return Ok(ChunkResult::BadRequest);
+ }
+ index.find(&key, &value)
+ } else {
+ return Ok(ChunkResult::BadRequest);
+ };
+
+ let mut hits = SearchHits::default();
+ for chunk_id in found {
+ let meta = match store.load_meta(&chunk_id) {
+ Ok(meta) => meta,
+ Err(_) => return Ok(ChunkResult::InternalServerError),
+ };
+ hits.insert(&chunk_id, meta);
+ }
+
+ Ok(ChunkResult::Found(hits))
+}
+
+#[derive(Default, Clone, Serialize)]
+struct SearchHits {
+ map: HashMap<String, ChunkMeta>,
+}
+
+impl SearchHits {
+ fn insert(&mut self, chunk_id: &ChunkId, meta: ChunkMeta) {
+ self.map.insert(chunk_id.to_string(), meta);
+ }
+
+ fn to_json(&self) -> String {
+ serde_json::to_string(&self.map).unwrap()
+ }
+}
+
+pub async fn delete_chunk(
+ id: String,
+ config: Arc<Mutex<Config>>,
+ index: Arc<Mutex<Index>>,
+) -> Result<impl warp::Reply, warp::Rejection> {
+ let config = config.lock().await;
+ let mut index = index.lock().await;
+ let store = Store::new(&config.chunks);
+ let id: ChunkId = id.parse().unwrap();
+
+ eprintln!("delete: {:?}", id);
+ let chunk = match store.load(&id) {
+ Ok(chunk) => {
+ eprintln!("loaded: {:?}", chunk.meta());
+ chunk
+ }
+ Err(e) => {
+ eprintln!("error loading: {:?}", e);
+ return Ok(ChunkResult::NotFound);
+ }
+ };
+
+ let meta = chunk.meta();
+ index.remove("sha256", meta.sha256());
+ index.remove_generation(&id);
+
+ match store.delete(&id) {
+ Ok(_) => {
+ eprintln!("deleted: {:?}", id);
+ Ok(ChunkResult::Deleted)
+ }
+ Err(e) => {
+ eprintln!("error deleting: {:?}", e);
+ Ok(ChunkResult::NotFound)
+ }
}
}
enum ChunkResult {
Created(ChunkId),
Fetched(Chunk),
+ Found(SearchHits),
+ Deleted,
+ NotFound,
BadRequest,
InternalServerError,
}
@@ -201,6 +303,20 @@ impl warp::Reply for ChunkResult {
*r.status_mut() = StatusCode::OK;
r
}
+ ChunkResult::Found(hits) => {
+ let mut r = warp::reply::Response::new(hits.to_json().into());
+ r.headers_mut().insert(
+ warp::http::header::CONTENT_TYPE,
+ warp::http::header::HeaderValue::from_static("application/json"),
+ );
+ *r.status_mut() = StatusCode::OK;
+ r
+ }
+ ChunkResult::Deleted => {
+ let mut r = warp::reply::Response::new("".into());
+ *r.status_mut() = StatusCode::OK;
+ r
+ }
ChunkResult::BadRequest => {
let mut r = warp::reply::Response::new("".into());
r.headers_mut().insert(
@@ -210,6 +326,11 @@ impl warp::Reply for ChunkResult {
*r.status_mut() = StatusCode::BAD_REQUEST;
r
}
+ ChunkResult::NotFound => {
+ let mut r = warp::reply::Response::new("".into());
+ *r.status_mut() = StatusCode::NOT_FOUND;
+ r
+ }
ChunkResult::InternalServerError => {
let mut r = warp::reply::Response::new("".into());
r.headers_mut().insert(
diff --git a/src/index.rs b/src/index.rs
index ed0183e..0166b0f 100644
--- a/src/index.rs
+++ b/src/index.rs
@@ -30,6 +30,11 @@ impl Index {
}
}
+ pub fn remove(&mut self, key: &str, value: &str) {
+ let kv = kv(key, value);
+ self.map.remove(&kv);
+ }
+
pub fn find(&self, key: &str, value: &str) -> Vec<ChunkId> {
let kv = kv(key, value);
if let Some(v) = self.map.get(&kv) {
@@ -43,6 +48,15 @@ impl Index {
self.generations.push(id)
}
+ pub fn remove_generation(&mut self, id: &ChunkId) {
+ self.generations = self
+ .generations
+ .iter()
+ .cloned()
+ .filter(|x| x != id)
+ .collect();
+ }
+
pub fn find_generations(&self) -> Vec<ChunkId> {
self.generations.clone()
}
@@ -82,6 +96,16 @@ mod test {
}
#[test]
+ fn removes_inserted() {
+ let id: ChunkId = "id001".parse().unwrap();
+ let mut idx = Index::default();
+ idx.insert(id.clone(), "sha256", "abc");
+ idx.remove("sha256", "abc");
+ let ids: Vec<ChunkId> = idx.find("sha256", "abc");
+ assert_eq!(ids, vec![]);
+ }
+
+ #[test]
fn has_no_generations_initially() {
let idx = Index::default();
assert_eq!(idx.find_generations(), vec![]);
@@ -94,4 +118,13 @@ mod test {
idx.insert_generation(id.clone());
assert_eq!(idx.find_generations(), vec![id]);
}
+
+ #[test]
+ fn removes_generaion() {
+ let id: ChunkId = "id001".parse().unwrap();
+ let mut idx = Index::default();
+ idx.insert_generation(id.clone());
+ idx.remove_generation(&id);
+ assert_eq!(idx.find_generations(), vec![]);
+ }
}
diff --git a/src/store.rs b/src/store.rs
index 0c15f02..873b8f2 100644
--- a/src/store.rs
+++ b/src/store.rs
@@ -44,4 +44,11 @@ impl Store {
let data = std::fs::read(&self.filename(id, "data"))?;
Ok(Chunk::new(meta, data))
}
+
+ /// Delete a chunk from a store.
+ pub fn delete(&self, id: &ChunkId) -> anyhow::Result<()> {
+ std::fs::remove_file(&self.filename(id, "meta"))?;
+ std::fs::remove_file(&self.filename(id, "data"))?;
+ Ok(())
+ }
}