summaryrefslogtreecommitdiff
path: root/src/bin/obnam-server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/obnam-server.rs')
-rw-r--r--src/bin/obnam-server.rs75
1 files changed, 24 insertions, 51 deletions
diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs
index 6cf4122..9b5a557 100644
--- a/src/bin/obnam-server.rs
+++ b/src/bin/obnam-server.rs
@@ -1,9 +1,10 @@
use anyhow::Context;
+use clap::Parser;
use log::{debug, error, info};
-use obnam::chunk::DataChunk;
use obnam::chunkid::ChunkId;
use obnam::chunkmeta::ChunkMeta;
-use obnam::indexedstore::IndexedStore;
+use obnam::chunkstore::ChunkStore;
+use obnam::label::Label;
use obnam::server::{ServerConfig, ServerConfigError};
use serde::Serialize;
use std::collections::HashMap;
@@ -11,16 +12,14 @@ use std::default::Default;
use std::net::{SocketAddr, ToSocketAddrs};
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use structopt::StructOpt;
use tokio::sync::Mutex;
use warp::http::StatusCode;
use warp::hyper::body::Bytes;
use warp::Filter;
-#[derive(Debug, StructOpt)]
-#[structopt(name = "obnam2-server", about = "Backup server")]
+#[derive(Debug, Parser)]
+#[clap(name = "obnam2-server", about = "Backup server")]
struct Opt {
- #[structopt(parse(from_os_str))]
config: PathBuf,
}
@@ -28,7 +27,7 @@ struct Opt {
async fn main() -> anyhow::Result<()> {
pretty_env_logger::init_custom_env("OBNAM_SERVER_LOG");
- let opt = Opt::from_args();
+ let opt = Opt::parse();
let config = load_config(&opt.config)?;
let addresses: Vec<SocketAddr> = config.address.to_socket_addrs()?.collect();
@@ -38,7 +37,7 @@ async fn main() -> anyhow::Result<()> {
return Err(ServerConfigError::BadServerAddress.into());
}
- let store = IndexedStore::new(&config.chunks)?;
+ let store = ChunkStore::local(&config.chunks)?;
let store = Arc::new(Mutex::new(store));
let store = warp::any().map(move || Arc::clone(&store));
@@ -71,16 +70,8 @@ async fn main() -> anyhow::Result<()> {
.and(store.clone())
.and_then(search_chunks);
- let delete = warp::delete()
- .and(warp::path("v1"))
- .and(warp::path("chunks"))
- .and(warp::path::param())
- .and(warp::path::end())
- .and(store.clone())
- .and_then(delete_chunk);
-
let log = warp::log("obnam");
- let webroot = create.or(fetch).or(search).or(delete).with(log);
+ let webroot = create.or(fetch).or(search).with(log);
debug!("starting warp");
warp::serve(webroot)
@@ -103,11 +94,11 @@ fn load_config(filename: &Path) -> Result<ServerConfig, anyhow::Error> {
}
pub async fn create_chunk(
- store: Arc<Mutex<IndexedStore>>,
+ store: Arc<Mutex<ChunkStore>>,
meta: String,
data: Bytes,
) -> Result<impl warp::Reply, warp::Rejection> {
- let mut store = store.lock().await;
+ let store = store.lock().await;
let meta: ChunkMeta = match meta.parse() {
Ok(s) => s,
@@ -117,9 +108,7 @@ pub async fn create_chunk(
}
};
- let chunk = DataChunk::new(data.to_vec(), meta);
-
- let id = match store.save(&chunk) {
+ let id = match store.put(data.to_vec(), &meta).await {
Ok(id) => id,
Err(e) => {
error!("couldn't save: {}", e);
@@ -133,11 +122,11 @@ pub async fn create_chunk(
pub async fn fetch_chunk(
id: String,
- store: Arc<Mutex<IndexedStore>>,
+ store: Arc<Mutex<ChunkStore>>,
) -> Result<impl warp::Reply, warp::Rejection> {
let store = store.lock().await;
let id: ChunkId = id.parse().unwrap();
- match store.load(&id) {
+ match store.get(&id).await {
Ok((data, meta)) => {
info!("found chunk {}: {:?}", id, meta);
Ok(ChunkResult::Fetched(meta, data))
@@ -151,18 +140,23 @@ pub async fn fetch_chunk(
pub async fn search_chunks(
query: HashMap<String, String>,
- store: Arc<Mutex<IndexedStore>>,
+ store: Arc<Mutex<ChunkStore>>,
) -> Result<impl warp::Reply, warp::Rejection> {
let store = store.lock().await;
let mut query = query.iter();
let found = if let Some((key, value)) = query.next() {
- if query.next() != None {
+ if query.next().is_some() {
error!("search has more than one key to search for");
return Ok(ChunkResult::BadRequest);
}
if key == "label" {
- store.find_by_label(value).expect("SQL lookup failed")
+ let label = Label::deserialize(value).unwrap();
+ let label = ChunkMeta::new(&label);
+ store
+ .find_by_label(&label)
+ .await
+ .expect("SQL lookup failed")
} else {
error!("unknown search key {:?}", key);
return Ok(ChunkResult::BadRequest);
@@ -174,7 +168,7 @@ pub async fn search_chunks(
let mut hits = SearchHits::default();
for chunk_id in found {
- let meta = match store.load_meta(&chunk_id) {
+ let (_, meta) = match store.get(&chunk_id).await {
Ok(meta) => {
info!("search found chunk {}", chunk_id);
meta
@@ -213,30 +207,10 @@ impl SearchHits {
}
}
-pub async fn delete_chunk(
- id: String,
- store: Arc<Mutex<IndexedStore>>,
-) -> Result<impl warp::Reply, warp::Rejection> {
- let mut store = store.lock().await;
- let id: ChunkId = id.parse().unwrap();
-
- match store.remove(&id) {
- Ok(_) => {
- info!("chunk deleted: {}", id);
- Ok(ChunkResult::Deleted)
- }
- Err(e) => {
- error!("could not delete chunk {}: {:?}", id, e);
- Ok(ChunkResult::NotFound)
- }
- }
-}
-
enum ChunkResult {
Created(ChunkId),
- Fetched(ChunkMeta, DataChunk),
+ Fetched(ChunkMeta, Vec<u8>),
Found(SearchHits),
- Deleted,
NotFound,
BadRequest,
InternalServerError,
@@ -265,13 +239,12 @@ impl warp::Reply for ChunkResult {
);
into_response(
StatusCode::OK,
- chunk.data(),
+ &chunk,
"application/octet-stream",
Some(headers),
)
}
ChunkResult::Found(hits) => json_response(StatusCode::OK, hits.to_json(), None),
- ChunkResult::Deleted => status_response(StatusCode::OK),
ChunkResult::BadRequest => status_response(StatusCode::BAD_REQUEST),
ChunkResult::NotFound => status_response(StatusCode::NOT_FOUND),
ChunkResult::InternalServerError => status_response(StatusCode::INTERNAL_SERVER_ERROR),