summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2022-10-28 06:07:15 +0000
committerLars Wirzenius <liw@liw.fi>2022-10-28 06:07:15 +0000
commit5a342d99d2136d1cd1fa227e9125d3f3f5affe9c (patch)
tree27f5da006713f2f48a0f6836f6485133c9434278
parent8f648fb02fefc432f612a77bb4a1f61b56d004e0 (diff)
parent14ab788c930846aeb2c472b421a9030b0e9e95f3 (diff)
downloadobnam2-5a342d99d2136d1cd1fa227e9125d3f3f5affe9c.tar.gz
Merge branch 'liw/minoru-async-store' into 'main'
feat: add a new ChunkStore to store chunks locally or remotely See merge request obnam/obnam!240
-rw-r--r--obnam.md26
-rw-r--r--src/backup_run.rs25
-rw-r--r--src/bin/obnam-server.rs64
-rw-r--r--src/chunkstore.rs307
-rw-r--r--src/client.rs142
-rw-r--r--src/cmd/backup.rs6
-rw-r--r--src/lib.rs1
-rw-r--r--subplot/server.py6
8 files changed, 366 insertions, 211 deletions
diff --git a/obnam.md b/obnam.md
index 0c36a8b..ef6daea 100644
--- a/obnam.md
+++ b/obnam.md
@@ -1259,21 +1259,6 @@ and content-type is application/json
and the JSON body matches {"<ID>":{"label":"0abc"}}
~~~
-Finally, we must be able to delete it. After that, we must not be able
-to retrieve it, or find it using metadata.
-
-~~~scenario
-when I DELETE /v1/chunks/<ID>
-then HTTP status code is 200
-
-when I GET /v1/chunks/<ID>
-then HTTP status code is 404
-
-when I GET /v1/chunks?label=0abc
-then HTTP status code is 200
-and content-type is application/json
-and the JSON body matches {}
-~~~
## Retrieve a chunk that does not exist
@@ -1298,17 +1283,6 @@ and content-type is application/json
and the JSON body matches {}
~~~
-## Delete chunk that does not exist
-
-We must get the right error when deleting a chunk that doesn't exist.
-
-~~~scenario
-given a working Obnam system
-when I try to DELETE /v1/chunks/any.random.string
-then HTTP status code is 404
-~~~
-
-
## Persistent across restarts
Chunk storage, and the index of chunk metadata for searches, needs to
diff --git a/src/backup_run.rs b/src/backup_run.rs
index 516e172..372ef65 100644
--- a/src/backup_run.rs
+++ b/src/backup_run.rs
@@ -31,7 +31,7 @@ const SQLITE_CHUNK_SIZE: usize = MIB as usize;
/// A running backup.
pub struct BackupRun<'a> {
checksum_kind: Option<LabelChecksumKind>,
- client: &'a BackupClient,
+ client: &'a mut BackupClient,
policy: BackupPolicy,
buffer_size: usize,
progress: Option<BackupProgress>,
@@ -106,7 +106,10 @@ pub struct RootsBackupOutcome {
impl<'a> BackupRun<'a> {
/// Create a new run for an initial backup.
- pub fn initial(config: &ClientConfig, client: &'a BackupClient) -> Result<Self, BackupError> {
+ pub fn initial(
+ config: &ClientConfig,
+ client: &'a mut BackupClient,
+ ) -> Result<Self, BackupError> {
Ok(Self {
checksum_kind: Some(DEFAULT_CHECKSUM_KIND),
client,
@@ -119,7 +122,7 @@ impl<'a> BackupRun<'a> {
/// Create a new run for an incremental backup.
pub fn incremental(
config: &ClientConfig,
- client: &'a BackupClient,
+ client: &'a mut BackupClient,
) -> Result<Self, BackupError> {
Ok(Self {
checksum_kind: None,
@@ -189,7 +192,7 @@ impl<'a> BackupRun<'a> {
/// Back up all the roots for this run.
pub async fn backup_roots(
- &self,
+ &mut self,
config: &ClientConfig,
old: &LocalGeneration,
newpath: &Path,
@@ -236,7 +239,7 @@ impl<'a> BackupRun<'a> {
}
async fn backup_one_root(
- &self,
+ &mut self,
config: &ClientConfig,
old: &LocalGeneration,
new: &mut NascentGeneration,
@@ -287,7 +290,7 @@ impl<'a> BackupRun<'a> {
}
async fn backup_if_needed(
- &self,
+ &mut self,
entry: AnnotatedFsEntry,
old: &LocalGeneration,
) -> Result<Option<FsEntryBackupOutcome>, BackupError> {
@@ -322,7 +325,7 @@ impl<'a> BackupRun<'a> {
}
async fn backup_one_entry(
- &self,
+ &mut self,
entry: &AnnotatedFsEntry,
path: &Path,
reason: Reason,
@@ -351,7 +354,7 @@ impl<'a> BackupRun<'a> {
/// Upload any file content for a file system entry.
pub async fn upload_filesystem_entry(
- &self,
+ &mut self,
e: &FilesystemEntry,
size: usize,
) -> Result<Vec<ChunkId>, BackupError> {
@@ -370,7 +373,7 @@ impl<'a> BackupRun<'a> {
/// Upload the metadata for the backup of this run.
pub async fn upload_generation(
- &self,
+ &mut self,
filename: &Path,
size: usize,
) -> Result<ChunkId, BackupError> {
@@ -384,7 +387,7 @@ impl<'a> BackupRun<'a> {
}
async fn upload_regular_file(
- &self,
+ &mut self,
filename: &Path,
size: usize,
) -> Result<Vec<ChunkId>, BackupError> {
@@ -407,7 +410,7 @@ impl<'a> BackupRun<'a> {
Ok(chunk_ids)
}
- async fn upload_nascent_generation(&self, filename: &Path) -> Result<ChunkId, ObnamError> {
+ async fn upload_nascent_generation(&mut self, filename: &Path) -> Result<ChunkId, ObnamError> {
let progress = BackupProgress::upload_generation();
let gen_id = self.upload_generation(filename, SQLITE_CHUNK_SIZE).await?;
progress.finish();
diff --git a/src/bin/obnam-server.rs b/src/bin/obnam-server.rs
index cfa2cb5..102e8b6 100644
--- a/src/bin/obnam-server.rs
+++ b/src/bin/obnam-server.rs
@@ -1,10 +1,10 @@
use anyhow::Context;
use clap::Parser;
use log::{debug, error, info};
-use obnam::chunk::DataChunk;
use obnam::chunkid::ChunkId;
use obnam::chunkmeta::ChunkMeta;
-use obnam::indexedstore::IndexedStore;
+use obnam::chunkstore::ChunkStore;
+use obnam::label::Label;
use obnam::server::{ServerConfig, ServerConfigError};
use serde::Serialize;
use std::collections::HashMap;
@@ -37,7 +37,7 @@ async fn main() -> anyhow::Result<()> {
return Err(ServerConfigError::BadServerAddress.into());
}
- let store = IndexedStore::new(&config.chunks)?;
+ let store = ChunkStore::local(&config.chunks)?;
let store = Arc::new(Mutex::new(store));
let store = warp::any().map(move || Arc::clone(&store));
@@ -70,16 +70,8 @@ async fn main() -> anyhow::Result<()> {
.and(store.clone())
.and_then(search_chunks);
- let delete = warp::delete()
- .and(warp::path("v1"))
- .and(warp::path("chunks"))
- .and(warp::path::param())
- .and(warp::path::end())
- .and(store.clone())
- .and_then(delete_chunk);
-
let log = warp::log("obnam");
- let webroot = create.or(fetch).or(search).or(delete).with(log);
+ let webroot = create.or(fetch).or(search).with(log);
debug!("starting warp");
warp::serve(webroot)
@@ -102,11 +94,11 @@ fn load_config(filename: &Path) -> Result<ServerConfig, anyhow::Error> {
}
pub async fn create_chunk(
- store: Arc<Mutex<IndexedStore>>,
+ store: Arc<Mutex<ChunkStore>>,
meta: String,
data: Bytes,
) -> Result<impl warp::Reply, warp::Rejection> {
- let mut store = store.lock().await;
+ let store = store.lock().await;
let meta: ChunkMeta = match meta.parse() {
Ok(s) => s,
@@ -116,9 +108,7 @@ pub async fn create_chunk(
}
};
- let chunk = DataChunk::new(data.to_vec(), meta);
-
- let id = match store.save(&chunk) {
+ let id = match store.put(data.to_vec(), &meta).await {
Ok(id) => id,
Err(e) => {
error!("couldn't save: {}", e);
@@ -132,11 +122,11 @@ pub async fn create_chunk(
pub async fn fetch_chunk(
id: String,
- store: Arc<Mutex<IndexedStore>>,
+ store: Arc<Mutex<ChunkStore>>,
) -> Result<impl warp::Reply, warp::Rejection> {
let store = store.lock().await;
let id: ChunkId = id.parse().unwrap();
- match store.load(&id) {
+ match store.get(&id).await {
Ok((data, meta)) => {
info!("found chunk {}: {:?}", id, meta);
Ok(ChunkResult::Fetched(meta, data))
@@ -150,7 +140,7 @@ pub async fn fetch_chunk(
pub async fn search_chunks(
query: HashMap<String, String>,
- store: Arc<Mutex<IndexedStore>>,
+ store: Arc<Mutex<ChunkStore>>,
) -> Result<impl warp::Reply, warp::Rejection> {
let store = store.lock().await;
@@ -161,7 +151,12 @@ pub async fn search_chunks(
return Ok(ChunkResult::BadRequest);
}
if key == "label" {
- store.find_by_label(value).expect("SQL lookup failed")
+ let label = Label::deserialize(value).unwrap();
+ let label = ChunkMeta::new(&label);
+ store
+ .find_by_label(&label)
+ .await
+ .expect("SQL lookup failed")
} else {
error!("unknown search key {:?}", key);
return Ok(ChunkResult::BadRequest);
@@ -173,7 +168,7 @@ pub async fn search_chunks(
let mut hits = SearchHits::default();
for chunk_id in found {
- let meta = match store.load_meta(&chunk_id) {
+ let (_, meta) = match store.get(&chunk_id).await {
Ok(meta) => {
info!("search found chunk {}", chunk_id);
meta
@@ -212,30 +207,10 @@ impl SearchHits {
}
}
-pub async fn delete_chunk(
- id: String,
- store: Arc<Mutex<IndexedStore>>,
-) -> Result<impl warp::Reply, warp::Rejection> {
- let mut store = store.lock().await;
- let id: ChunkId = id.parse().unwrap();
-
- match store.remove(&id) {
- Ok(_) => {
- info!("chunk deleted: {}", id);
- Ok(ChunkResult::Deleted)
- }
- Err(e) => {
- error!("could not delete chunk {}: {:?}", id, e);
- Ok(ChunkResult::NotFound)
- }
- }
-}
-
enum ChunkResult {
Created(ChunkId),
- Fetched(ChunkMeta, DataChunk),
+ Fetched(ChunkMeta, Vec<u8>),
Found(SearchHits),
- Deleted,
NotFound,
BadRequest,
InternalServerError,
@@ -264,13 +239,12 @@ impl warp::Reply for ChunkResult {
);
into_response(
StatusCode::OK,
- chunk.data(),
+ &chunk,
"application/octet-stream",
Some(headers),
)
}
ChunkResult::Found(hits) => json_response(StatusCode::OK, hits.to_json(), None),
- ChunkResult::Deleted => status_response(StatusCode::OK),
ChunkResult::BadRequest => status_response(StatusCode::BAD_REQUEST),
ChunkResult::NotFound => status_response(StatusCode::NOT_FOUND),
ChunkResult::InternalServerError => status_response(StatusCode::INTERNAL_SERVER_ERROR),
diff --git a/src/chunkstore.rs b/src/chunkstore.rs
new file mode 100644
index 0000000..2b93720
--- /dev/null
+++ b/src/chunkstore.rs
@@ -0,0 +1,307 @@
+//! Access local and remote chunk stores.
+//!
+//! A chunk store may be local and accessed via the file system, or
+//! remote and accessed over HTTP. This module implements both. This
+//! module only handles encrypted chunks.
+
+use crate::chunkid::ChunkId;
+use crate::chunkmeta::ChunkMeta;
+use crate::config::{ClientConfig, ClientConfigError};
+use crate::index::{Index, IndexError};
+
+use log::{debug, error, info};
+use reqwest::header::HeaderMap;
+use std::collections::HashMap;
+use std::path::{Path, PathBuf};
+use tokio::sync::Mutex;
+
+/// A chunk store.
+///
+/// The store may be local or remote.
+pub enum ChunkStore {
+ /// A local chunk store.
+ Local(LocalStore),
+
+ /// A remote chunk store.
+ Remote(RemoteStore),
+}
+
+impl ChunkStore {
+ /// Open a local chunk store.
+ pub fn local<P: AsRef<Path>>(path: P) -> Result<Self, StoreError> {
+ let store = LocalStore::new(path.as_ref())?;
+ Ok(Self::Local(store))
+ }
+
+ /// Open a remote chunk store.
+ pub fn remote(config: &ClientConfig) -> Result<Self, StoreError> {
+ let store = RemoteStore::new(config)?;
+ Ok(Self::Remote(store))
+ }
+
+ /// Does the store have a chunk with a given label?
+ pub async fn find_by_label(&self, meta: &ChunkMeta) -> Result<Vec<ChunkId>, StoreError> {
+ match self {
+ Self::Local(store) => store.find_by_label(meta).await,
+ Self::Remote(store) => store.find_by_label(meta).await,
+ }
+ }
+
+ /// Store a chunk in the store.
+ ///
+ /// The store chooses an id for the chunk.
+ pub async fn put(&self, chunk: Vec<u8>, meta: &ChunkMeta) -> Result<ChunkId, StoreError> {
+ match self {
+ Self::Local(store) => store.put(chunk, meta).await,
+ Self::Remote(store) => store.put(chunk, meta).await,
+ }
+ }
+
+ /// Get a chunk given its id.
+ pub async fn get(&self, id: &ChunkId) -> Result<(Vec<u8>, ChunkMeta), StoreError> {
+ match self {
+ Self::Local(store) => store.get(id).await,
+ Self::Remote(store) => store.get(id).await,
+ }
+ }
+}
+
+/// A local chunk store.
+pub struct LocalStore {
+ path: PathBuf,
+ index: Mutex<Index>,
+}
+
+impl LocalStore {
+ fn new(path: &Path) -> Result<Self, StoreError> {
+ Ok(Self {
+ path: path.to_path_buf(),
+ index: Mutex::new(Index::new(path)?),
+ })
+ }
+
+ async fn find_by_label(&self, meta: &ChunkMeta) -> Result<Vec<ChunkId>, StoreError> {
+ self.index
+ .lock()
+ .await
+ .find_by_label(meta.label())
+ .map_err(StoreError::Index)
+ }
+
+ async fn put(&self, chunk: Vec<u8>, meta: &ChunkMeta) -> Result<ChunkId, StoreError> {
+ let id = ChunkId::new();
+ let (dir, filename) = self.filename(&id);
+
+ if !dir.exists() {
+ std::fs::create_dir_all(&dir).map_err(|err| StoreError::ChunkMkdir(dir, err))?;
+ }
+
+ std::fs::write(&filename, &chunk)
+ .map_err(|err| StoreError::WriteChunk(filename.clone(), err))?;
+ self.index
+ .lock()
+ .await
+ .insert_meta(id.clone(), meta.clone())
+ .map_err(StoreError::Index)?;
+ Ok(id)
+ }
+
+ async fn get(&self, id: &ChunkId) -> Result<(Vec<u8>, ChunkMeta), StoreError> {
+ let meta = self.index.lock().await.get_meta(id)?;
+
+ let (_, filename) = &self.filename(id);
+
+ let raw =
+ std::fs::read(&filename).map_err(|err| StoreError::ReadChunk(filename.clone(), err))?;
+
+ Ok((raw, meta))
+ }
+
+ fn filename(&self, id: &ChunkId) -> (PathBuf, PathBuf) {
+ let bytes = id.as_bytes();
+ assert!(bytes.len() > 3);
+ let a = bytes[0];
+ let b = bytes[1];
+ let c = bytes[2];
+ let dir = self.path.join(format!("{}/{}/{}", a, b, c));
+ let filename = dir.join(format!("{}.data", id));
+ (dir, filename)
+ }
+}
+
+/// A remote chunk store.
+pub struct RemoteStore {
+ client: reqwest::Client,
+ base_url: String,
+}
+
+impl RemoteStore {
+ fn new(config: &ClientConfig) -> Result<Self, StoreError> {
+ info!("creating remote store with config: {:#?}", config);
+
+ let client = reqwest::Client::builder()
+ .danger_accept_invalid_certs(!config.verify_tls_cert)
+ .build()
+ .map_err(StoreError::ReqwestError)?;
+ Ok(Self {
+ client,
+ base_url: config.server_url.to_string(),
+ })
+ }
+
+ async fn find_by_label(&self, meta: &ChunkMeta) -> Result<Vec<ChunkId>, StoreError> {
+ let body = match self.get_helper("", &[("label", meta.label())]).await {
+ Ok((_, body)) => body,
+ Err(err) => return Err(err),
+ };
+
+ let hits: HashMap<String, ChunkMeta> =
+ serde_json::from_slice(&body).map_err(StoreError::JsonParse)?;
+ let ids = hits.iter().map(|(id, _)| ChunkId::recreate(id)).collect();
+ Ok(ids)
+ }
+
+ async fn put(&self, chunk: Vec<u8>, meta: &ChunkMeta) -> Result<ChunkId, StoreError> {
+ let res = self
+ .client
+ .post(&self.chunks_url())
+ .header("chunk-meta", meta.to_json())
+ .body(chunk)
+ .send()
+ .await
+ .map_err(StoreError::ReqwestError)?;
+ let res: HashMap<String, String> = res.json().await.map_err(StoreError::ReqwestError)?;
+ debug!("upload_chunk: res={:?}", res);
+ let chunk_id = if let Some(chunk_id) = res.get("chunk_id") {
+ debug!("upload_chunk: id={}", chunk_id);
+ chunk_id.parse().unwrap()
+ } else {
+ return Err(StoreError::NoCreatedChunkId);
+ };
+ info!("uploaded_chunk {}", chunk_id);
+ Ok(chunk_id)
+ }
+
+ async fn get(&self, id: &ChunkId) -> Result<(Vec<u8>, ChunkMeta), StoreError> {
+ let (headers, body) = self.get_helper(&format!("/{}", id), &[]).await?;
+ let meta = self.get_chunk_meta_header(id, &headers)?;
+ Ok((body, meta))
+ }
+
+ fn base_url(&self) -> &str {
+ &self.base_url
+ }
+
+ fn chunks_url(&self) -> String {
+ format!("{}/v1/chunks", self.base_url())
+ }
+
+ async fn get_helper(
+ &self,
+ path: &str,
+ query: &[(&str, &str)],
+ ) -> Result<(HeaderMap, Vec<u8>), StoreError> {
+ let url = format!("{}{}", &self.chunks_url(), path);
+ info!("GET {}", url);
+
+ // Build HTTP request structure.
+ let req = self
+ .client
+ .get(&url)
+ .query(query)
+ .build()
+ .map_err(StoreError::ReqwestError)?;
+
+ // Make HTTP request.
+ let res = self
+ .client
+ .execute(req)
+ .await
+ .map_err(StoreError::ReqwestError)?;
+
+ // Did it work?
+ if res.status() != 200 {
+ return Err(StoreError::NotFound(path.to_string()));
+ }
+
+ // Return headers and body.
+ let headers = res.headers().clone();
+ let body = res.bytes().await.map_err(StoreError::ReqwestError)?;
+ let body = body.to_vec();
+ Ok((headers, body))
+ }
+
+ fn get_chunk_meta_header(
+ &self,
+ chunk_id: &ChunkId,
+ headers: &HeaderMap,
+ ) -> Result<ChunkMeta, StoreError> {
+ let meta = headers.get("chunk-meta");
+
+ if meta.is_none() {
+ let err = StoreError::NoChunkMeta(chunk_id.clone());
+ error!("fetching chunk {} failed: {}", chunk_id, err);
+ return Err(err);
+ }
+
+ let meta = meta
+ .unwrap()
+ .to_str()
+ .map_err(StoreError::MetaHeaderToString)?;
+ let meta: ChunkMeta = serde_json::from_str(meta).map_err(StoreError::JsonParse)?;
+
+ Ok(meta)
+ }
+}
+
+/// Possible errors from using a ChunkStore.
+#[derive(Debug, thiserror::Error)]
+pub enum StoreError {
+ /// FIXME
+ #[error("FIXME")]
+ FIXME,
+
+ /// Error from a chunk index.
+ #[error(transparent)]
+ Index(#[from] IndexError),
+
+ /// An error from the HTTP library.
+ #[error("error from reqwest library: {0}")]
+ ReqwestError(reqwest::Error),
+
+ /// Client configuration is wrong.
+ #[error(transparent)]
+ ClientConfigError(#[from] ClientConfigError),
+
+ /// Server claims to not have an entity.
+ #[error("Server does not have {0}")]
+ NotFound(String),
+
+ /// Server didn't give us a chunk's metadata.
+ #[error("Server response did not have a 'chunk-meta' header for chunk {0}")]
+ NoChunkMeta(ChunkId),
+
+ /// An error with the `chunk-meta` header.
+ #[error("couldn't convert response chunk-meta header to string: {0}")]
+ MetaHeaderToString(reqwest::header::ToStrError),
+
+ /// Error parsing JSON.
+ #[error("failed to parse JSON: {0}")]
+ JsonParse(serde_json::Error),
+
+ /// An error creating chunk directory.
+ #[error("Failed to create chunk directory {0}")]
+ ChunkMkdir(PathBuf, #[source] std::io::Error),
+
+ /// An error writing a chunk file.
+ #[error("Failed to write chunk {0}")]
+ WriteChunk(PathBuf, #[source] std::io::Error),
+
+ /// An error reading a chunk file.
+ #[error("Failed to read chunk {0}")]
+ ReadChunk(PathBuf, #[source] std::io::Error),
+
+ /// No chunk id for uploaded chunk.
+ #[error("Server response claimed it had created a chunk, but lacked chunk id")]
+ NoCreatedChunkId,
+}
diff --git a/src/client.rs b/src/client.rs
index bed5f1e..7ae6581 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -5,15 +5,14 @@ use crate::chunk::{
};
use crate::chunkid::ChunkId;
use crate::chunkmeta::ChunkMeta;
+use crate::chunkstore::{ChunkStore, StoreError};
use crate::cipher::{CipherEngine, CipherError};
use crate::config::{ClientConfig, ClientConfigError};
use crate::generation::{FinishedGeneration, GenId, LocalGeneration, LocalGenerationError};
use crate::genlist::GenerationList;
use crate::label::Label;
-use log::{debug, error, info};
-use reqwest::header::HeaderMap;
-use std::collections::HashMap;
+use log::{error, info};
use std::fs::File;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
@@ -100,12 +99,15 @@ pub enum ClientError {
/// Failed to write a file.
#[error("failed to write to file {0}: {1}")]
FileWrite(PathBuf, std::io::Error),
+
+ /// Error from a chunk store.
+ #[error(transparent)]
+ ChunkStore(#[from] StoreError),
}
/// Client for the Obnam server HTTP API.
pub struct BackupClient {
- client: reqwest::Client,
- base_url: String,
+ store: ChunkStore,
cipher: CipherEngine,
}
@@ -113,68 +115,25 @@ impl BackupClient {
/// Create a new backup client.
pub fn new(config: &ClientConfig) -> Result<Self, ClientError> {
info!("creating backup client with config: {:#?}", config);
-
let pass = config.passwords()?;
-
- let client = reqwest::Client::builder()
- .danger_accept_invalid_certs(!config.verify_tls_cert)
- .build()
- .map_err(ClientError::ReqwestError)?;
Ok(Self {
- client,
- base_url: config.server_url.to_string(),
+ store: ChunkStore::remote(config)?,
cipher: CipherEngine::new(&pass),
})
}
- fn base_url(&self) -> &str {
- &self.base_url
- }
-
- fn chunks_url(&self) -> String {
- format!("{}/v1/chunks", self.base_url())
- }
-
/// Does the server have a chunk?
pub async fn has_chunk(&self, meta: &ChunkMeta) -> Result<Option<ChunkId>, ClientError> {
- let body = match self.get("", &[("label", meta.label())]).await {
- Ok((_, body)) => body,
- Err(err) => return Err(err),
- };
-
- let hits: HashMap<String, ChunkMeta> =
- serde_json::from_slice(&body).map_err(ClientError::JsonParse)?;
- let mut iter = hits.iter();
- let has = if let Some((chunk_id, _)) = iter.next() {
- Some(chunk_id.into())
- } else {
- None
- };
-
- Ok(has)
+ let mut ids = self.store.find_by_label(meta).await?;
+ Ok(ids.pop())
}
/// Upload a data chunk to the server.
- pub async fn upload_chunk(&self, chunk: DataChunk) -> Result<ChunkId, ClientError> {
+ pub async fn upload_chunk(&mut self, chunk: DataChunk) -> Result<ChunkId, ClientError> {
let enc = self.cipher.encrypt_chunk(&chunk)?;
- let res = self
- .client
- .post(&self.chunks_url())
- .header("chunk-meta", chunk.meta().to_json())
- .body(enc.ciphertext().to_vec())
- .send()
- .await
- .map_err(ClientError::ReqwestError)?;
- debug!("upload_chunk: res={:?}", res);
- let res: HashMap<String, String> = res.json().await.map_err(ClientError::ReqwestError)?;
- let chunk_id = if let Some(chunk_id) = res.get("chunk_id") {
- debug!("upload_chunk: id={}", chunk_id);
- chunk_id.parse().unwrap()
- } else {
- return Err(ClientError::NoCreatedChunkId);
- };
- info!("uploaded_chunk {}", chunk_id);
- Ok(chunk_id)
+ let data = enc.ciphertext().to_vec();
+ let id = self.store.put(data, chunk.meta()).await?;
+ Ok(id)
}
/// Get current client trust chunk from repository, if there is one.
@@ -196,15 +155,9 @@ impl BackupClient {
}
async fn find_client_trusts(&self) -> Result<Vec<ChunkId>, ClientError> {
- let label = Label::literal("client-trust").serialize();
- let body = match self.get("", &[("label", &label)]).await {
- Ok((_, body)) => body,
- Err(err) => return Err(err),
- };
-
- let hits: HashMap<String, ChunkMeta> =
- serde_json::from_slice(&body).map_err(ClientError::JsonParse)?;
- let ids = hits.iter().map(|(id, _)| id.into()).collect();
+ let label = Label::literal("client-trust");
+ let meta = ChunkMeta::new(&label);
+ let ids = self.store.find_by_label(&meta).await?;
Ok(ids)
}
@@ -220,9 +173,7 @@ impl BackupClient {
/// Fetch a data chunk from the server, given the chunk identifier.
pub async fn fetch_chunk(&self, chunk_id: &ChunkId) -> Result<DataChunk, ClientError> {
- let (headers, body) = self.get(&format!("/{}", chunk_id), &[]).await?;
- let meta = self.get_chunk_meta_header(chunk_id, &headers)?;
-
+ let (body, meta) = self.store.get(chunk_id).await?;
let meta_bytes = meta.to_json_vec();
let chunk = self.cipher.decrypt_chunk(&body, &meta_bytes)?;
@@ -257,61 +208,4 @@ impl BackupClient {
let gen = LocalGeneration::open(dbname)?;
Ok(gen)
}
-
- async fn get(
- &self,
- path: &str,
- query: &[(&str, &str)],
- ) -> Result<(HeaderMap, Vec<u8>), ClientError> {
- let url = format!("{}{}", &self.chunks_url(), path);
- info!("GET {}", url);
-
- // Build HTTP request structure.
- let req = self
- .client
- .get(&url)
- .query(query)
- .build()
- .map_err(ClientError::ReqwestError)?;
-
- // Make HTTP request.
- let res = self
- .client
- .execute(req)
- .await
- .map_err(ClientError::ReqwestError)?;
-
- // Did it work?
- if res.status() != 200 {
- return Err(ClientError::NotFound(path.to_string()));
- }
-
- // Return headers and body.
- let headers = res.headers().clone();
- let body = res.bytes().await.map_err(ClientError::ReqwestError)?;
- let body = body.to_vec();
- Ok((headers, body))
- }
-
- fn get_chunk_meta_header(
- &self,
- chunk_id: &ChunkId,
- headers: &HeaderMap,
- ) -> Result<ChunkMeta, ClientError> {
- let meta = headers.get("chunk-meta");
-
- if meta.is_none() {
- let err = ClientError::NoChunkMeta(chunk_id.clone());
- error!("fetching chunk {} failed: {}", chunk_id, err);
- return Err(err);
- }
-
- let meta = meta
- .unwrap()
- .to_str()
- .map_err(ClientError::MetaHeaderToString)?;
- let meta: ChunkMeta = serde_json::from_str(meta).map_err(ClientError::JsonParse)?;
-
- Ok(meta)
- }
}
diff --git a/src/cmd/backup.rs b/src/cmd/backup.rs
index a18027b..70e9eac 100644
--- a/src/cmd/backup.rs
+++ b/src/cmd/backup.rs
@@ -45,7 +45,7 @@ impl Backup {
let major = self.backup_version.unwrap_or(DEFAULT_SCHEMA_MAJOR);
let schema = schema_version(major)?;
- let client = BackupClient::new(config)?;
+ let mut client = BackupClient::new(config)?;
let trust = client
.get_client_trust()
.await?
@@ -68,7 +68,7 @@ impl Backup {
let (is_incremental, outcome) = if let Some(old_id) = old_id {
info!("incremental backup based on {}", old_id);
- let mut run = BackupRun::incremental(config, &client)?;
+ let mut run = BackupRun::incremental(config, &mut client)?;
let old = run.start(Some(&old_id), &oldtemp, perf).await?;
(
true,
@@ -77,7 +77,7 @@ impl Backup {
)
} else {
info!("fresh backup without a previous generation");
- let mut run = BackupRun::initial(config, &client)?;
+ let mut run = BackupRun::initial(config, &mut client)?;
let old = run.start(None, &oldtemp, perf).await?;
(
false,
diff --git a/src/lib.rs b/src/lib.rs
index fbbea15..a54c5ff 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -13,6 +13,7 @@ pub mod chunk;
pub mod chunker;
pub mod chunkid;
pub mod chunkmeta;
+pub mod chunkstore;
pub mod cipher;
pub mod client;
pub mod cmd;
diff --git a/subplot/server.py b/subplot/server.py
index 1f4506f..a604733 100644
--- a/subplot/server.py
+++ b/subplot/server.py
@@ -87,7 +87,9 @@ def delete_chunk_by_id(ctx, chunk_id=None):
def make_chunk_file_be_empty(ctx, chunk_id=None):
chunk_id = ctx["vars"][chunk_id]
chunks = ctx["config"]["chunks"]
- for (dirname, _, _) in os.walk(chunks):
+ logging.debug(f"trying to empty chunk {chunk_id}")
+ for (dirname, _, filenames) in os.walk(chunks):
+ logging.debug(f"found directory {dirname}, with {filenames}")
filename = os.path.join(dirname, chunk_id + ".data")
if os.path.exists(filename):
logging.debug(f"emptying chunk file {filename}")
@@ -136,7 +138,7 @@ def server_has_n_chunks(ctx, n=None):
assert_eq = globals()["assert_eq"]
n = int(n)
files = find_files(ctx["config"]["chunks"])
- files = [json.load(open(x)) for x in files if x.endswith(".meta")]
+ files = [x for x in files if x.endswith(".data")]
logging.debug(f"server_has_n_file_chunks: n={n}")
logging.debug(f"server_has_n_file_chunks: len(files)={len(files)}")
logging.debug(f"server_has_n_file_chunks: files={files}")