diff options
author | Lars Wirzenius <liw@liw.fi> | 2020-09-19 08:12:54 +0300 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2020-09-19 10:13:49 +0300 |
commit | 70b8d232c2b7e2892a117f61e3a9892bf4994ea4 (patch) | |
tree | 691804e9e8a63d1f5bb6750a20a0d3c0cd6fa6f7 | |
parent | e1c4683b73ec2a207321377636f4ed722d0674dc (diff) | |
download | obnam2-70b8d232c2b7e2892a117f61e3a9892bf4994ea4.tar.gz |
feat: search, delete chunks on chunk server
Also heavily refactor the now-long scenario by splitting out a happy
path and some unhappy paths.
-rw-r--r-- | obnam.md | 69 | ||||
-rw-r--r-- | src/bin/obnam-server.rs | 141 | ||||
-rw-r--r-- | src/index.rs | 33 | ||||
-rw-r--r-- | src/store.rs | 7 | ||||
-rw-r--r-- | subplot/obnam.py | 53 | ||||
-rw-r--r-- | subplot/obnam.yaml | 19 |
6 files changed, 299 insertions, 23 deletions
@@ -241,15 +241,9 @@ These scenarios verify that the chunk server works on its own. The scenarios start a fresh, empty chunk server, and do some operations on it, and verify the results, and finally terminate the server. -### Chunk management +### Chunk management happy path -This scenario verifies that a chunk can be uploaded and then -retrieved, with its metadata, and then deleted. The chunk server has -an API with just one endpoint, `/chunks`, and accepts the the POST, -GET, and DELETE operations on it. - -To create a chunk, we use POST. We remember the identifier so we can -retrieve the chunk later. +We must be able to create a new chunk. ~~~scenario given a chunk server @@ -260,7 +254,7 @@ and content-type is application/json and the JSON body has a field chunk_id, henceforth ID ~~~ -To retrieve a chunk, we use GET. +We must be able to retrieve it. ~~~scenario when I GET /chunks/<ID> @@ -270,10 +264,63 @@ and chunk-meta is {"sha256":"abc","generation":null,"ended":null} and the body matches file data.dat ~~~ -TODO: fetch non-existent chunk +We must also be able to find it based on metadata. + +~~~scenario +when I GET /chunks?sha256=abc +then HTTP status code is 200 +and content-type is application/json +and the JSON body matches {"<ID>":{"sha256":"abc","generation":null,"ended":null}} +~~~ + +Finally, we must be able to delete it. After that, we must not be able +to retrieve it, or find it using metadata. + +~~~scenario +when I DELETE /chunks/<ID> +then HTTP status code is 200 + +when I GET /chunks/<ID> +then HTTP status code is 404 + +when I GET /chunks?sha256=abc +then HTTP status code is 200 +and content-type is application/json +and the JSON body matches {} +~~~ + +### Retrieve a chunk that does not exist -TODO: delete chunk +We must get the right error if we try to retrieve a chunk that does +not exist. +~~~scenario +given a chunk server +when I try to GET /chunks/any.random.string +then HTTP status code is 404 +~~~ + +### Search without matches + +We must get an empty result if searching for chunks that don't exist. + +~~~scenario +given a chunk server +when I GET /chunks?sha256=abc +then HTTP status code is 200 +and content-type is application/json +and the JSON body matches {} +~~~ + +### Delete chunk that does not exist + +We must get the right error when deleting a chunk that doesn't exist. + +~~~scenario +given a chunk server +when I try to DELETE /chunks/any.random.string +then HTTP status code is 404 +~~~ ## Smoke test diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs index 3adf2eb..c81e67a 100644 --- a/src/bin/obnam-server.rs +++ b/src/bin/obnam-server.rs @@ -1,6 +1,7 @@ use bytes::Bytes; use obnam::{chunk::Chunk, chunkid::ChunkId, chunkmeta::ChunkMeta, index::Index, store::Store}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::default::Default; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -41,15 +42,21 @@ async fn main() -> anyhow::Result<()> { .and(config.clone()) .and_then(fetch_chunk); - // let search = warp::get() - // .and(warp::path("chunks")) - // .and(warp::query::<HashMap<String, String>>()) - // .and(config.clone()) - // .and(index.clone()) - // .and_then(obnam::routes::search::search_chunks); + let search = warp::get() + .and(warp::path("chunks")) + .and(warp::query::<HashMap<String, String>>()) + .and(config.clone()) + .and(index.clone()) + .and_then(search_chunks); - // let webroot = create.or(fetch).or(search); - let webroot = create.or(fetch); + let delete = warp::delete() + .and(warp::path("chunks")) + .and(warp::path::param()) + .and(config.clone()) + .and(index.clone()) + .and_then(delete_chunk); + + let webroot = create.or(fetch).or(search).or(delete); warp::serve(webroot) .tls() .key_path(config_bare.tls_key) @@ -151,15 +158,110 @@ pub async fn fetch_chunk( let config = config.lock().await; let store = Store::new(&config.chunks); let id: ChunkId = id.parse().unwrap(); + eprintln!("fetch: {:?}", id); match store.load(&id) { - Ok(chunk) => Ok(ChunkResult::Fetched(chunk)), - Err(_) => Err(warp::reject::not_found()), + Ok(chunk) => { + eprintln!("loaded: {:?}", chunk.meta()); + Ok(ChunkResult::Fetched(chunk)) + } + Err(e) => { + eprintln!("error loading: {:?}", e); + Ok(ChunkResult::NotFound) + } + } +} + +pub async fn search_chunks( + query: HashMap<String, String>, + config: Arc<Mutex<Config>>, + index: Arc<Mutex<Index>>, +) -> Result<impl warp::Reply, warp::Rejection> { + let index = index.lock().await; + let config = config.lock().await; + + let store = Store::new(&config.chunks); + + let mut query = query.iter(); + let found = if let Some((key, value)) = query.next() { + if query.next() != None { + return Ok(ChunkResult::BadRequest); + } + index.find(&key, &value) + } else { + return Ok(ChunkResult::BadRequest); + }; + + let mut hits = SearchHits::default(); + for chunk_id in found { + let meta = match store.load_meta(&chunk_id) { + Ok(meta) => meta, + Err(_) => return Ok(ChunkResult::InternalServerError), + }; + hits.insert(&chunk_id, meta); + } + + Ok(ChunkResult::Found(hits)) +} + +#[derive(Default, Clone, Serialize)] +struct SearchHits { + map: HashMap<String, ChunkMeta>, +} + +impl SearchHits { + fn insert(&mut self, chunk_id: &ChunkId, meta: ChunkMeta) { + self.map.insert(chunk_id.to_string(), meta); + } + + fn to_json(&self) -> String { + serde_json::to_string(&self.map).unwrap() + } +} + +pub async fn delete_chunk( + id: String, + config: Arc<Mutex<Config>>, + index: Arc<Mutex<Index>>, +) -> Result<impl warp::Reply, warp::Rejection> { + let config = config.lock().await; + let mut index = index.lock().await; + let store = Store::new(&config.chunks); + let id: ChunkId = id.parse().unwrap(); + + eprintln!("delete: {:?}", id); + let chunk = match store.load(&id) { + Ok(chunk) => { + eprintln!("loaded: {:?}", chunk.meta()); + chunk + } + Err(e) => { + eprintln!("error loading: {:?}", e); + return Ok(ChunkResult::NotFound); + } + }; + + let meta = chunk.meta(); + index.remove("sha256", meta.sha256()); + index.remove_generation(&id); + + match store.delete(&id) { + Ok(_) => { + eprintln!("deleted: {:?}", id); + Ok(ChunkResult::Deleted) + } + Err(e) => { + eprintln!("error deleting: {:?}", e); + Ok(ChunkResult::NotFound) + } } } enum ChunkResult { Created(ChunkId), Fetched(Chunk), + Found(SearchHits), + Deleted, + NotFound, BadRequest, InternalServerError, } @@ -201,6 +303,20 @@ impl warp::Reply for ChunkResult { *r.status_mut() = StatusCode::OK; r } + ChunkResult::Found(hits) => { + let mut r = warp::reply::Response::new(hits.to_json().into()); + r.headers_mut().insert( + warp::http::header::CONTENT_TYPE, + warp::http::header::HeaderValue::from_static("application/json"), + ); + *r.status_mut() = StatusCode::OK; + r + } + ChunkResult::Deleted => { + let mut r = warp::reply::Response::new("".into()); + *r.status_mut() = StatusCode::OK; + r + } ChunkResult::BadRequest => { let mut r = warp::reply::Response::new("".into()); r.headers_mut().insert( @@ -210,6 +326,11 @@ impl warp::Reply for ChunkResult { *r.status_mut() = StatusCode::BAD_REQUEST; r } + ChunkResult::NotFound => { + let mut r = warp::reply::Response::new("".into()); + *r.status_mut() = StatusCode::NOT_FOUND; + r + } ChunkResult::InternalServerError => { let mut r = warp::reply::Response::new("".into()); r.headers_mut().insert( diff --git a/src/index.rs b/src/index.rs index ed0183e..0166b0f 100644 --- a/src/index.rs +++ b/src/index.rs @@ -30,6 +30,11 @@ impl Index { } } + pub fn remove(&mut self, key: &str, value: &str) { + let kv = kv(key, value); + self.map.remove(&kv); + } + pub fn find(&self, key: &str, value: &str) -> Vec<ChunkId> { let kv = kv(key, value); if let Some(v) = self.map.get(&kv) { @@ -43,6 +48,15 @@ impl Index { self.generations.push(id) } + pub fn remove_generation(&mut self, id: &ChunkId) { + self.generations = self + .generations + .iter() + .cloned() + .filter(|x| x != id) + .collect(); + } + pub fn find_generations(&self) -> Vec<ChunkId> { self.generations.clone() } @@ -82,6 +96,16 @@ mod test { } #[test] + fn removes_inserted() { + let id: ChunkId = "id001".parse().unwrap(); + let mut idx = Index::default(); + idx.insert(id.clone(), "sha256", "abc"); + idx.remove("sha256", "abc"); + let ids: Vec<ChunkId> = idx.find("sha256", "abc"); + assert_eq!(ids, vec![]); + } + + #[test] fn has_no_generations_initially() { let idx = Index::default(); assert_eq!(idx.find_generations(), vec![]); @@ -94,4 +118,13 @@ mod test { idx.insert_generation(id.clone()); assert_eq!(idx.find_generations(), vec![id]); } + + #[test] + fn removes_generaion() { + let id: ChunkId = "id001".parse().unwrap(); + let mut idx = Index::default(); + idx.insert_generation(id.clone()); + idx.remove_generation(&id); + assert_eq!(idx.find_generations(), vec![]); + } } diff --git a/src/store.rs b/src/store.rs index 0c15f02..873b8f2 100644 --- a/src/store.rs +++ b/src/store.rs @@ -44,4 +44,11 @@ impl Store { let data = std::fs::read(&self.filename(id, "data"))?; Ok(Chunk::new(meta, data)) } + + /// Delete a chunk from a store. + pub fn delete(&self, id: &ChunkId) -> anyhow::Result<()> { + std::fs::remove_file(&self.filename(id, "meta"))?; + std::fs::remove_file(&self.filename(id, "data"))?; + Ok(()) + } } diff --git a/subplot/obnam.py b/subplot/obnam.py index c827180..ccfdc67 100644 --- a/subplot/obnam.py +++ b/subplot/obnam.py @@ -1,6 +1,8 @@ +import json import logging import os import random +import re import requests import shutil import socket @@ -60,12 +62,31 @@ def post_file(ctx, filename=None, path=None, header=None, json=None): _request(ctx, requests.post, url, headers=headers, data=data) -def get_chunk(ctx, var=None): +def get_chunk_via_var(ctx, var=None): chunk_id = ctx["vars"][var] + get_chunk_by_id(ctx, chunk_id=chunk_id) + + +def get_chunk_by_id(ctx, chunk_id=None): url = f"{ctx['url']}/chunks/{chunk_id}" _request(ctx, requests.get, url) +def find_chunks_with_sha(ctx, sha=None): + url = f"{ctx['url']}/chunks?sha256={sha}" + _request(ctx, requests.get, url) + + +def delete_chunk_via_var(ctx, var=None): + chunk_id = ctx["vars"][var] + delete_chunk_by_id(ctx, chunk_id=chunk_id) + + +def delete_chunk_by_id(ctx, chunk_id=None): + url = f"{ctx['url']}/chunks/{chunk_id}" + _request(ctx, requests.delete, url) + + def status_code_is(ctx, status=None): assert_eq = globals()["assert_eq"] assert_eq(ctx["http.status"], int(status)) @@ -92,6 +113,18 @@ def body_matches_file(ctx, filename=None): assert_eq(ctx["http.raw"], content) +def json_body_matches(ctx, wanted=None): + assert_eq = globals()["assert_eq"] + wanted = _expand_vars(ctx, wanted) + wanted = json.loads(wanted) + body = ctx["http.json"] + logging.debug(f"json_body_matches:") + logging.debug(f" wanted: {wanted!r} ({type(wanted)}") + logging.debug(f" body : {body!r} ({type(body)}") + for key in wanted: + assert_eq(body.get(key, "not.there"), wanted[key]) + + # Name of Rust binary, debug-build. def _binary(name): srcdir = globals()["srcdir"] @@ -132,3 +165,21 @@ def _request(ctx, method, url, headers=None, data=None): if not r.ok: stderr = open(ctx["daemon"]["obnam-server"]["stderr"], "rb").read() logging.debug(f" server stderr: {stderr!r}") + + +# Expand variables ("<foo>") in a string with values from ctx. +def _expand_vars(ctx, s): + v = ctx.get("vars") + if v is None: + return s + result = [] + while True: + m = re.search(f"<(\\S+)>", s) + if not m: + result.append(s) + break + result.append(s[: m.start()]) + value = v[m.group(1)] + result.append(value) + s = s[m.end() :] + return "".join(result) diff --git a/subplot/obnam.yaml b/subplot/obnam.yaml index 7acf581..065cb01 100644 --- a/subplot/obnam.yaml +++ b/subplot/obnam.yaml @@ -15,7 +15,20 @@ function: post_file - when: "I GET /chunks/<{var}>" - function: get_chunk + function: get_chunk_via_var + +- when: "I try to GET /chunks/{chunk_id}" + function: get_chunk_by_id + +- when: "I GET /chunks?sha256={sha}" + regex: false + function: find_chunks_with_sha + +- when: "I DELETE /chunks/<{var}>" + function: delete_chunk_via_var + +- when: "I try to DELETE /chunks/{chunk_id}" + function: delete_chunk_by_id - then: "HTTP status code is {status}" function: status_code_is @@ -26,5 +39,9 @@ - then: "the JSON body has a field {field}, henceforth {var}" function: remember_json_field +- then: "the JSON body matches (?P<wanted>.*)" + regex: true + function: json_body_matches + - then: "the body matches file {filename}" function: body_matches_file |