diff options
Diffstat (limited to 'src/bin/obnam-server.rs')
-rw-r--r-- | src/bin/obnam-server.rs | 75 |
1 files changed, 24 insertions, 51 deletions
diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs index 6cf4122..9b5a557 100644 --- a/src/bin/obnam-server.rs +++ b/src/bin/obnam-server.rs @@ -1,9 +1,10 @@ use anyhow::Context; +use clap::Parser; use log::{debug, error, info}; -use obnam::chunk::DataChunk; use obnam::chunkid::ChunkId; use obnam::chunkmeta::ChunkMeta; -use obnam::indexedstore::IndexedStore; +use obnam::chunkstore::ChunkStore; +use obnam::label::Label; use obnam::server::{ServerConfig, ServerConfigError}; use serde::Serialize; use std::collections::HashMap; @@ -11,16 +12,14 @@ use std::default::Default; use std::net::{SocketAddr, ToSocketAddrs}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use structopt::StructOpt; use tokio::sync::Mutex; use warp::http::StatusCode; use warp::hyper::body::Bytes; use warp::Filter; -#[derive(Debug, StructOpt)] -#[structopt(name = "obnam2-server", about = "Backup server")] +#[derive(Debug, Parser)] +#[clap(name = "obnam2-server", about = "Backup server")] struct Opt { - #[structopt(parse(from_os_str))] config: PathBuf, } @@ -28,7 +27,7 @@ struct Opt { async fn main() -> anyhow::Result<()> { pretty_env_logger::init_custom_env("OBNAM_SERVER_LOG"); - let opt = Opt::from_args(); + let opt = Opt::parse(); let config = load_config(&opt.config)?; let addresses: Vec<SocketAddr> = config.address.to_socket_addrs()?.collect(); @@ -38,7 +37,7 @@ async fn main() -> anyhow::Result<()> { return Err(ServerConfigError::BadServerAddress.into()); } - let store = IndexedStore::new(&config.chunks)?; + let store = ChunkStore::local(&config.chunks)?; let store = Arc::new(Mutex::new(store)); let store = warp::any().map(move || Arc::clone(&store)); @@ -71,16 +70,8 @@ async fn main() -> anyhow::Result<()> { .and(store.clone()) .and_then(search_chunks); - let delete = warp::delete() - .and(warp::path("v1")) - .and(warp::path("chunks")) - .and(warp::path::param()) - .and(warp::path::end()) - .and(store.clone()) - .and_then(delete_chunk); - let log = warp::log("obnam"); - let webroot = create.or(fetch).or(search).or(delete).with(log); + let webroot = create.or(fetch).or(search).with(log); debug!("starting warp"); warp::serve(webroot) @@ -103,11 +94,11 @@ fn load_config(filename: &Path) -> Result<ServerConfig, anyhow::Error> { } pub async fn create_chunk( - store: Arc<Mutex<IndexedStore>>, + store: Arc<Mutex<ChunkStore>>, meta: String, data: Bytes, ) -> Result<impl warp::Reply, warp::Rejection> { - let mut store = store.lock().await; + let store = store.lock().await; let meta: ChunkMeta = match meta.parse() { Ok(s) => s, @@ -117,9 +108,7 @@ pub async fn create_chunk( } }; - let chunk = DataChunk::new(data.to_vec(), meta); - - let id = match store.save(&chunk) { + let id = match store.put(data.to_vec(), &meta).await { Ok(id) => id, Err(e) => { error!("couldn't save: {}", e); @@ -133,11 +122,11 @@ pub async fn create_chunk( pub async fn fetch_chunk( id: String, - store: Arc<Mutex<IndexedStore>>, + store: Arc<Mutex<ChunkStore>>, ) -> Result<impl warp::Reply, warp::Rejection> { let store = store.lock().await; let id: ChunkId = id.parse().unwrap(); - match store.load(&id) { + match store.get(&id).await { Ok((data, meta)) => { info!("found chunk {}: {:?}", id, meta); Ok(ChunkResult::Fetched(meta, data)) @@ -151,18 +140,23 @@ pub async fn fetch_chunk( pub async fn search_chunks( query: HashMap<String, String>, - store: Arc<Mutex<IndexedStore>>, + store: Arc<Mutex<ChunkStore>>, ) -> Result<impl warp::Reply, warp::Rejection> { let store = store.lock().await; let mut query = query.iter(); let found = if let Some((key, value)) = query.next() { - if query.next() != None { + if query.next().is_some() { error!("search has more than one key to search for"); return Ok(ChunkResult::BadRequest); } if key == "label" { - store.find_by_label(value).expect("SQL lookup failed") + let label = Label::deserialize(value).unwrap(); + let label = ChunkMeta::new(&label); + store + .find_by_label(&label) + .await + .expect("SQL lookup failed") } else { error!("unknown search key {:?}", key); return Ok(ChunkResult::BadRequest); @@ -174,7 +168,7 @@ pub async fn search_chunks( let mut hits = SearchHits::default(); for chunk_id in found { - let meta = match store.load_meta(&chunk_id) { + let (_, meta) = match store.get(&chunk_id).await { Ok(meta) => { info!("search found chunk {}", chunk_id); meta @@ -213,30 +207,10 @@ impl SearchHits { } } -pub async fn delete_chunk( - id: String, - store: Arc<Mutex<IndexedStore>>, -) -> Result<impl warp::Reply, warp::Rejection> { - let mut store = store.lock().await; - let id: ChunkId = id.parse().unwrap(); - - match store.remove(&id) { - Ok(_) => { - info!("chunk deleted: {}", id); - Ok(ChunkResult::Deleted) - } - Err(e) => { - error!("could not delete chunk {}: {:?}", id, e); - Ok(ChunkResult::NotFound) - } - } -} - enum ChunkResult { Created(ChunkId), - Fetched(ChunkMeta, DataChunk), + Fetched(ChunkMeta, Vec<u8>), Found(SearchHits), - Deleted, NotFound, BadRequest, InternalServerError, @@ -265,13 +239,12 @@ impl warp::Reply for ChunkResult { ); into_response( StatusCode::OK, - chunk.data(), + &chunk, "application/octet-stream", Some(headers), ) } ChunkResult::Found(hits) => json_response(StatusCode::OK, hits.to_json(), None), - ChunkResult::Deleted => status_response(StatusCode::OK), ChunkResult::BadRequest => status_response(StatusCode::BAD_REQUEST), ChunkResult::NotFound => status_response(StatusCode::NOT_FOUND), ChunkResult::InternalServerError => status_response(StatusCode::INTERNAL_SERVER_ERROR), |