summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2020-11-24 07:15:46 +0000
committerLars Wirzenius <liw@liw.fi>2020-11-24 07:15:46 +0000
commit02684a9c68ffc87d20ba89852fbcba43ed521039 (patch)
treec7c52a9f734cb6d582e4ec4d2633b93cd510ff1c
parent1172320e0da7687c62c1f020761903029b6bf3a2 (diff)
parentcf18e86c351ddbaf655d02e66c1666cca4806947 (diff)
downloadobnam2-02684a9c68ffc87d20ba89852fbcba43ed521039.tar.gz
Merge branch 'refactor' into 'main'
refactor: add an abstraction for an indexed store See merge request larswirzenius/obnam!22
-rw-r--r--src/bin/obnam-server.rs92
-rw-r--r--src/indexedstore.rs57
-rw-r--r--src/lib.rs1
3 files changed, 89 insertions, 61 deletions
diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs
index 520d6a9..cf53d2f 100644
--- a/src/bin/obnam-server.rs
+++ b/src/bin/obnam-server.rs
@@ -1,6 +1,9 @@
use bytes::Bytes;
use log::{debug, error, info};
-use obnam::{chunk::DataChunk, chunkid::ChunkId, chunkmeta::ChunkMeta, index::Index, store::Store};
+use obnam::chunk::DataChunk;
+use obnam::chunkid::ChunkId;
+use obnam::chunkmeta::ChunkMeta;
+use obnam::indexedstore::IndexedStore;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::default::Default;
@@ -24,21 +27,18 @@ async fn main() -> anyhow::Result<()> {
let opt = Opt::from_args();
let config = Config::read_config(&opt.config).unwrap();
- let config_bare = config.clone();
- let config = Arc::new(Mutex::new(config));
- let config = warp::any().map(move || Arc::clone(&config));
- let index = Arc::new(Mutex::new(Index::default()));
- let index = warp::any().map(move || Arc::clone(&index));
+ let store = IndexedStore::new(&config.chunks);
+ let store = Arc::new(Mutex::new(store));
+ let store = warp::any().map(move || Arc::clone(&store));
info!("Obnam server starting up");
debug!("opt: {:#?}", opt);
- debug!("Configuration: {:#?}", config_bare);
+ debug!("Configuration: {:#?}", config);
let create = warp::post()
.and(warp::path("chunks"))
- .and(config.clone())
- .and(index.clone())
+ .and(store.clone())
.and(warp::header("chunk-meta"))
.and(warp::filters::body::bytes())
.and_then(create_chunk);
@@ -46,21 +46,19 @@ async fn main() -> anyhow::Result<()> {
let fetch = warp::get()
.and(warp::path("chunks"))
.and(warp::path::param())
- .and(config.clone())
+ .and(store.clone())
.and_then(fetch_chunk);
let search = warp::get()
.and(warp::path("chunks"))
.and(warp::query::<HashMap<String, String>>())
- .and(config.clone())
- .and(index.clone())
+ .and(store.clone())
.and_then(search_chunks);
let delete = warp::delete()
.and(warp::path("chunks"))
.and(warp::path::param())
- .and(config.clone())
- .and(index.clone())
+ .and(store.clone())
.and_then(delete_chunk);
let log = warp::log("obnam");
@@ -71,7 +69,7 @@ async fn main() -> anyhow::Result<()> {
// .tls()
// .key_path(config_bare.tls_key)
// .cert_path(config_bare.tls_cert)
- .run(([127, 0, 0, 1], config_bare.port))
+ .run(([127, 0, 0, 1], config.port))
.await;
Ok(())
}
@@ -125,14 +123,11 @@ impl Config {
}
pub async fn create_chunk(
- config: Arc<Mutex<Config>>,
- index: Arc<Mutex<Index>>,
+ store: Arc<Mutex<IndexedStore>>,
meta: String,
data: Bytes,
) -> Result<impl warp::Reply, warp::Rejection> {
- let id = ChunkId::new();
- let config = config.lock().await;
- let store = Store::new(&config.chunks);
+ let mut store = store.lock().await;
let meta: ChunkMeta = match meta.parse() {
Ok(s) => s,
@@ -144,19 +139,13 @@ pub async fn create_chunk(
let chunk = DataChunk::new(data.to_vec());
- match store.save(&id, &meta, &chunk) {
- Ok(_) => (),
+ let id = match store.save(&meta, &chunk) {
+ Ok(id) => id,
Err(e) => {
- error!("could not write chunk to disk: {}", e);
+ error!("couldn't save: {}", e);
return Ok(ChunkResult::InternalServerError);
}
- }
-
- let mut index = index.lock().await;
- index.insert(id.clone(), "sha256", meta.sha256());
- if meta.is_generation() {
- index.insert_generation(id.clone());
- }
+ };
info!("created chunk {}: {:?}", id, meta);
Ok(ChunkResult::Created(id))
@@ -164,10 +153,9 @@ pub async fn create_chunk(
pub async fn fetch_chunk(
id: String,
- config: Arc<Mutex<Config>>,
+ store: Arc<Mutex<IndexedStore>>,
) -> Result<impl warp::Reply, warp::Rejection> {
- let config = config.lock().await;
- let store = Store::new(&config.chunks);
+ let store = store.lock().await;
let id: ChunkId = id.parse().unwrap();
match store.load(&id) {
Ok((meta, chunk)) => {
@@ -183,13 +171,9 @@ pub async fn fetch_chunk(
pub async fn search_chunks(
query: HashMap<String, String>,
- config: Arc<Mutex<Config>>,
- index: Arc<Mutex<Index>>,
+ store: Arc<Mutex<IndexedStore>>,
) -> Result<impl warp::Reply, warp::Rejection> {
- let index = index.lock().await;
- let config = config.lock().await;
-
- let store = Store::new(&config.chunks);
+ let store = store.lock().await;
let mut query = query.iter();
let found = if let Some((key, value)) = query.next() {
@@ -198,9 +182,12 @@ pub async fn search_chunks(
return Ok(ChunkResult::BadRequest);
}
if key == "generation" && value == "true" {
- index.find_generations()
+ store.find_generations()
+ } else if key == "sha256" {
+ store.find_by_sha256(value)
} else {
- index.find(&key, &value)
+ error!("unknown search key {:?}", key);
+ return Ok(ChunkResult::BadRequest);
}
} else {
error!("search has no key to search for");
@@ -250,29 +237,12 @@ impl SearchHits {
pub async fn delete_chunk(
id: String,
- config: Arc<Mutex<Config>>,
- index: Arc<Mutex<Index>>,
+ store: Arc<Mutex<IndexedStore>>,
) -> Result<impl warp::Reply, warp::Rejection> {
- let config = config.lock().await;
- let mut index = index.lock().await;
- let store = Store::new(&config.chunks);
+ let mut store = store.lock().await;
let id: ChunkId = id.parse().unwrap();
- let (meta, _) = match store.load(&id) {
- Ok((meta, chunk)) => {
- debug!("found chunk to delete: {}", id);
- (meta, chunk)
- }
- Err(e) => {
- error!("could not find chunk to delete: {}: {:?}", id, e);
- return Ok(ChunkResult::NotFound);
- }
- };
-
- index.remove("sha256", meta.sha256());
- index.remove_generation(&id);
-
- match store.delete(&id) {
+ match store.remove(&id) {
Ok(_) => {
info!("chunk deleted: {}", id);
Ok(ChunkResult::Deleted)
diff --git a/src/indexedstore.rs b/src/indexedstore.rs
new file mode 100644
index 0000000..4cc90cc
--- /dev/null
+++ b/src/indexedstore.rs
@@ -0,0 +1,57 @@
+use crate::chunk::DataChunk;
+use crate::chunkid::ChunkId;
+use crate::chunkmeta::ChunkMeta;
+use crate::index::Index;
+use crate::store::Store;
+use std::path::Path;
+
+/// A store for chunks and their metadata.
+///
+/// This combines Store and Index into one interface to make it easier
+/// to handle the server side storage of chunks.
+pub struct IndexedStore {
+ store: Store,
+ index: Index,
+}
+
+impl IndexedStore {
+ pub fn new(dirname: &Path) -> Self {
+ let store = Store::new(dirname);
+ let index = Index::default();
+ Self { store, index }
+ }
+
+ pub fn save(&mut self, meta: &ChunkMeta, chunk: &DataChunk) -> anyhow::Result<ChunkId> {
+ let id = ChunkId::new();
+ self.store.save(&id, meta, chunk)?;
+ self.index.insert(id.clone(), "sha256", meta.sha256());
+ if meta.is_generation() {
+ self.index.insert_generation(id.clone());
+ }
+ Ok(id)
+ }
+
+ pub fn load(&self, id: &ChunkId) -> anyhow::Result<(ChunkMeta, DataChunk)> {
+ self.store.load(id)
+ }
+
+ pub fn load_meta(&self, id: &ChunkId) -> anyhow::Result<ChunkMeta> {
+ self.store.load_meta(id)
+ }
+
+ pub fn find_by_sha256(&self, sha256: &str) -> Vec<ChunkId> {
+ self.index.find("sha256", sha256)
+ }
+
+ pub fn find_generations(&self) -> Vec<ChunkId> {
+ self.index.find_generations()
+ }
+
+ pub fn remove(&mut self, id: &ChunkId) -> anyhow::Result<()> {
+ let (meta, _) = self.store.load(id)?;
+ self.index.remove("sha256", meta.sha256());
+ self.index.remove_generation(id);
+ self.store.delete(id)?;
+ Ok(())
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 3d20797..bdc790f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -9,5 +9,6 @@ pub mod fsentry;
pub mod fsiter;
pub mod generation;
pub mod index;
+pub mod indexedstore;
pub mod server;
pub mod store;