summaryrefslogtreecommitdiff
path: root/src/bin/obnam-server.rs
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2020-11-24 08:35:32 +0200
committerLars Wirzenius <liw@liw.fi>2020-11-24 09:14:46 +0200
commitcf18e86c351ddbaf655d02e66c1666cca4806947 (patch)
treec7c52a9f734cb6d582e4ec4d2633b93cd510ff1c /src/bin/obnam-server.rs
parent1172320e0da7687c62c1f020761903029b6bf3a2 (diff)
downloadobnam2-cf18e86c351ddbaf655d02e66c1666cca4806947.tar.gz
refactor: add an abstraction for an indexed store
This makes it easier to write a server without the HTTP layer.
Diffstat (limited to 'src/bin/obnam-server.rs')
-rw-r--r--src/bin/obnam-server.rs92
1 files changed, 31 insertions, 61 deletions
diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs
index 520d6a9..cf53d2f 100644
--- a/src/bin/obnam-server.rs
+++ b/src/bin/obnam-server.rs
@@ -1,6 +1,9 @@
use bytes::Bytes;
use log::{debug, error, info};
-use obnam::{chunk::DataChunk, chunkid::ChunkId, chunkmeta::ChunkMeta, index::Index, store::Store};
+use obnam::chunk::DataChunk;
+use obnam::chunkid::ChunkId;
+use obnam::chunkmeta::ChunkMeta;
+use obnam::indexedstore::IndexedStore;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::default::Default;
@@ -24,21 +27,18 @@ async fn main() -> anyhow::Result<()> {
let opt = Opt::from_args();
let config = Config::read_config(&opt.config).unwrap();
- let config_bare = config.clone();
- let config = Arc::new(Mutex::new(config));
- let config = warp::any().map(move || Arc::clone(&config));
- let index = Arc::new(Mutex::new(Index::default()));
- let index = warp::any().map(move || Arc::clone(&index));
+ let store = IndexedStore::new(&config.chunks);
+ let store = Arc::new(Mutex::new(store));
+ let store = warp::any().map(move || Arc::clone(&store));
info!("Obnam server starting up");
debug!("opt: {:#?}", opt);
- debug!("Configuration: {:#?}", config_bare);
+ debug!("Configuration: {:#?}", config);
let create = warp::post()
.and(warp::path("chunks"))
- .and(config.clone())
- .and(index.clone())
+ .and(store.clone())
.and(warp::header("chunk-meta"))
.and(warp::filters::body::bytes())
.and_then(create_chunk);
@@ -46,21 +46,19 @@ async fn main() -> anyhow::Result<()> {
let fetch = warp::get()
.and(warp::path("chunks"))
.and(warp::path::param())
- .and(config.clone())
+ .and(store.clone())
.and_then(fetch_chunk);
let search = warp::get()
.and(warp::path("chunks"))
.and(warp::query::<HashMap<String, String>>())
- .and(config.clone())
- .and(index.clone())
+ .and(store.clone())
.and_then(search_chunks);
let delete = warp::delete()
.and(warp::path("chunks"))
.and(warp::path::param())
- .and(config.clone())
- .and(index.clone())
+ .and(store.clone())
.and_then(delete_chunk);
let log = warp::log("obnam");
@@ -71,7 +69,7 @@ async fn main() -> anyhow::Result<()> {
// .tls()
// .key_path(config_bare.tls_key)
// .cert_path(config_bare.tls_cert)
- .run(([127, 0, 0, 1], config_bare.port))
+ .run(([127, 0, 0, 1], config.port))
.await;
Ok(())
}
@@ -125,14 +123,11 @@ impl Config {
}
pub async fn create_chunk(
- config: Arc<Mutex<Config>>,
- index: Arc<Mutex<Index>>,
+ store: Arc<Mutex<IndexedStore>>,
meta: String,
data: Bytes,
) -> Result<impl warp::Reply, warp::Rejection> {
- let id = ChunkId::new();
- let config = config.lock().await;
- let store = Store::new(&config.chunks);
+ let mut store = store.lock().await;
let meta: ChunkMeta = match meta.parse() {
Ok(s) => s,
@@ -144,19 +139,13 @@ pub async fn create_chunk(
let chunk = DataChunk::new(data.to_vec());
- match store.save(&id, &meta, &chunk) {
- Ok(_) => (),
+ let id = match store.save(&meta, &chunk) {
+ Ok(id) => id,
Err(e) => {
- error!("could not write chunk to disk: {}", e);
+ error!("couldn't save: {}", e);
return Ok(ChunkResult::InternalServerError);
}
- }
-
- let mut index = index.lock().await;
- index.insert(id.clone(), "sha256", meta.sha256());
- if meta.is_generation() {
- index.insert_generation(id.clone());
- }
+ };
info!("created chunk {}: {:?}", id, meta);
Ok(ChunkResult::Created(id))
@@ -164,10 +153,9 @@ pub async fn create_chunk(
pub async fn fetch_chunk(
id: String,
- config: Arc<Mutex<Config>>,
+ store: Arc<Mutex<IndexedStore>>,
) -> Result<impl warp::Reply, warp::Rejection> {
- let config = config.lock().await;
- let store = Store::new(&config.chunks);
+ let store = store.lock().await;
let id: ChunkId = id.parse().unwrap();
match store.load(&id) {
Ok((meta, chunk)) => {
@@ -183,13 +171,9 @@ pub async fn fetch_chunk(
pub async fn search_chunks(
query: HashMap<String, String>,
- config: Arc<Mutex<Config>>,
- index: Arc<Mutex<Index>>,
+ store: Arc<Mutex<IndexedStore>>,
) -> Result<impl warp::Reply, warp::Rejection> {
- let index = index.lock().await;
- let config = config.lock().await;
-
- let store = Store::new(&config.chunks);
+ let store = store.lock().await;
let mut query = query.iter();
let found = if let Some((key, value)) = query.next() {
@@ -198,9 +182,12 @@ pub async fn search_chunks(
return Ok(ChunkResult::BadRequest);
}
if key == "generation" && value == "true" {
- index.find_generations()
+ store.find_generations()
+ } else if key == "sha256" {
+ store.find_by_sha256(value)
} else {
- index.find(&key, &value)
+ error!("unknown search key {:?}", key);
+ return Ok(ChunkResult::BadRequest);
}
} else {
error!("search has no key to search for");
@@ -250,29 +237,12 @@ impl SearchHits {
pub async fn delete_chunk(
id: String,
- config: Arc<Mutex<Config>>,
- index: Arc<Mutex<Index>>,
+ store: Arc<Mutex<IndexedStore>>,
) -> Result<impl warp::Reply, warp::Rejection> {
- let config = config.lock().await;
- let mut index = index.lock().await;
- let store = Store::new(&config.chunks);
+ let mut store = store.lock().await;
let id: ChunkId = id.parse().unwrap();
- let (meta, _) = match store.load(&id) {
- Ok((meta, chunk)) => {
- debug!("found chunk to delete: {}", id);
- (meta, chunk)
- }
- Err(e) => {
- error!("could not find chunk to delete: {}: {:?}", id, e);
- return Ok(ChunkResult::NotFound);
- }
- };
-
- index.remove("sha256", meta.sha256());
- index.remove_generation(&id);
-
- match store.delete(&id) {
+ match store.remove(&id) {
Ok(_) => {
info!("chunk deleted: {}", id);
Ok(ChunkResult::Deleted)