From 383cff243c8937e31c48145a2422a99ff2109f6e Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Thu, 12 May 2022 19:26:08 +0300 Subject: feat: add a new ChunkStore to store chunks locally or remotely Sponsored-by: author --- src/chunkstore.rs | 304 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 2 files changed, 305 insertions(+) create mode 100644 src/chunkstore.rs diff --git a/src/chunkstore.rs b/src/chunkstore.rs new file mode 100644 index 0000000..f421805 --- /dev/null +++ b/src/chunkstore.rs @@ -0,0 +1,304 @@ +//! Access local and remote chunk stores. +//! +//! A chunk store may be local and accessed via the file system, or +//! remote and accessed over HTTP. This module implements both. This +//! module only handles encrypted chunks. + +use crate::chunkid::ChunkId; +use crate::chunkmeta::ChunkMeta; +use crate::cipher::EncryptedChunk; +use crate::config::{ClientConfig, ClientConfigError}; +use crate::index::{Index, IndexError}; + +use log::{debug, error, info}; +use reqwest::header::HeaderMap; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +/// A chunk store. +/// +/// The store may be local or remote. +pub enum ChunkStore { + /// A local chunk store. + Local(LocalStore), + + /// A remote chunk store. + Remote(RemoteStore), +} + +impl ChunkStore { + /// Open a local chunk store. + pub fn local>(path: P) -> Result { + let store = LocalStore::new(path.as_ref())?; + Ok(Self::Local(store)) + } + + /// Open a remote chunk store. + pub fn remote(config: &ClientConfig) -> Result { + let store = RemoteStore::new(config)?; + Ok(Self::Remote(store)) + } + + /// Does the store have a chunk with a given label? + pub async fn find_by_label(&self, meta: &ChunkMeta) -> Result, StoreError> { + match self { + Self::Local(store) => store.find_by_label(meta), + Self::Remote(store) => store.find_by_label(meta).await, + } + } + + /// Store a chunk in the store. + /// + /// The store chooses an id for the chunk. + pub async fn put(&mut self, chunk: EncryptedChunk, meta: &ChunkMeta) -> Result { + match self { + Self::Local(store) => store.put(chunk, meta), + Self::Remote(store) => store.put(chunk, meta).await, + } + } + + /// Get a chunk given its id. + pub async fn get(&self, id: &ChunkId) -> Result<(Vec, ChunkMeta), StoreError> { + match self { + Self::Local(store) => store.get(id), + Self::Remote(store) => store.get(id).await, + } + } +} + +/// A local chunk store. +pub struct LocalStore { + path: PathBuf, + index: Index, +} + +impl LocalStore { + fn new(path: &Path) -> Result { + Ok(Self { + path: path.to_path_buf(), + index: Index::new(path)?, + }) + } + + fn find_by_label(&self, meta: &ChunkMeta) -> Result, StoreError> { + self.index + .find_by_label(meta.label()) + .map_err(StoreError::Index) + } + + fn put(&mut self, chunk: EncryptedChunk, meta: &ChunkMeta) -> Result { + let id = ChunkId::new(); + let (dir, filename) = self.filename(&id); + + if !dir.exists() { + std::fs::create_dir_all(&dir).map_err(|err| StoreError::ChunkMkdir(dir, err))?; + } + + let data = chunk.ciphertext().to_vec(); + std::fs::write(&filename, &data) + .map_err(|err| StoreError::WriteChunk(filename.clone(), err))?; + self.index + .insert_meta(id.clone(), meta.clone()) + .map_err(StoreError::Index)?; + Ok(id) + } + + fn get(&self, id: &ChunkId) -> Result<(Vec, ChunkMeta), StoreError> { + let meta = self.index.get_meta(id)?; + + let (_, filename) = &self.filename(id); + + let raw = + std::fs::read(&filename).map_err(|err| StoreError::ReadChunk(filename.clone(), err))?; + + Ok((raw, meta)) + } + + fn filename(&self, id: &ChunkId) -> (PathBuf, PathBuf) { + let bytes = id.as_bytes(); + assert!(bytes.len() > 3); + let a = bytes[0]; + let b = bytes[1]; + let c = bytes[2]; + let dir = self.path.join(format!("{}/{}/{}", a, b, c)); + let filename = dir.join(format!("{}", id)); + (dir, filename) + } +} + +/// A remote chunk store. +pub struct RemoteStore { + client: reqwest::Client, + base_url: String, +} + +impl RemoteStore { + fn new(config: &ClientConfig) -> Result { + info!("creating remote store with config: {:#?}", config); + + let client = reqwest::Client::builder() + .danger_accept_invalid_certs(!config.verify_tls_cert) + .build() + .map_err(StoreError::ReqwestError)?; + Ok(Self { + client, + base_url: config.server_url.to_string(), + }) + } + + async fn find_by_label(&self, meta: &ChunkMeta) -> Result, StoreError> { + let body = match self.get_helper("", &[("label", meta.label())]).await { + Ok((_, body)) => body, + Err(err) => return Err(err), + }; + + let hits: HashMap = + serde_json::from_slice(&body).map_err(StoreError::JsonParse)?; + let ids = hits.iter().map(|(id, _)| ChunkId::recreate(id)).collect(); + Ok(ids) + } + + async fn put(&self, chunk: EncryptedChunk, meta: &ChunkMeta) -> Result { + let res = self + .client + .post(&self.chunks_url()) + .header("chunk-meta", meta.to_json()) + .body(chunk.ciphertext().to_vec()) + .send() + .await + .map_err(StoreError::ReqwestError)?; + let res: HashMap = res.json().await.map_err(StoreError::ReqwestError)?; + debug!("upload_chunk: res={:?}", res); + let chunk_id = if let Some(chunk_id) = res.get("chunk_id") { + debug!("upload_chunk: id={}", chunk_id); + chunk_id.parse().unwrap() + } else { + return Err(StoreError::NoCreatedChunkId); + }; + info!("uploaded_chunk {}", chunk_id); + Ok(chunk_id) + } + + async fn get(&self, id: &ChunkId) -> Result<(Vec, ChunkMeta), StoreError> { + let (headers, body) = self.get_helper(&format!("/{}", id), &[]).await?; + let meta = self.get_chunk_meta_header(id, &headers)?; + Ok((body, meta)) + } + + fn base_url(&self) -> &str { + &self.base_url + } + + fn chunks_url(&self) -> String { + format!("{}/v1/chunks", self.base_url()) + } + + async fn get_helper( + &self, + path: &str, + query: &[(&str, &str)], + ) -> Result<(HeaderMap, Vec), StoreError> { + let url = format!("{}{}", &self.chunks_url(), path); + info!("GET {}", url); + + // Build HTTP request structure. + let req = self + .client + .get(&url) + .query(query) + .build() + .map_err(StoreError::ReqwestError)?; + + // Make HTTP request. + let res = self + .client + .execute(req) + .await + .map_err(StoreError::ReqwestError)?; + + // Did it work? + if res.status() != 200 { + return Err(StoreError::NotFound(path.to_string())); + } + + // Return headers and body. + let headers = res.headers().clone(); + let body = res.bytes().await.map_err(StoreError::ReqwestError)?; + let body = body.to_vec(); + Ok((headers, body)) + } + + fn get_chunk_meta_header( + &self, + chunk_id: &ChunkId, + headers: &HeaderMap, + ) -> Result { + let meta = headers.get("chunk-meta"); + + if meta.is_none() { + let err = StoreError::NoChunkMeta(chunk_id.clone()); + error!("fetching chunk {} failed: {}", chunk_id, err); + return Err(err); + } + + let meta = meta + .unwrap() + .to_str() + .map_err(StoreError::MetaHeaderToString)?; + let meta: ChunkMeta = serde_json::from_str(meta).map_err(StoreError::JsonParse)?; + + Ok(meta) + } +} + +/// Possible errors from using a ChunkStore. +#[derive(Debug, thiserror::Error)] +pub enum StoreError { + /// FIXME + #[error("FIXME")] + FIXME, + + /// Error from a chunk index. + #[error(transparent)] + Index(#[from] IndexError), + + /// An error from the HTTP library. + #[error("error from reqwest library: {0}")] + ReqwestError(reqwest::Error), + + /// Client configuration is wrong. + #[error(transparent)] + ClientConfigError(#[from] ClientConfigError), + + /// Server claims to not have an entity. + #[error("Server does not have {0}")] + NotFound(String), + + /// Server didn't give us a chunk's metadata. + #[error("Server response did not have a 'chunk-meta' header for chunk {0}")] + NoChunkMeta(ChunkId), + + /// An error with the `chunk-meta` header. + #[error("couldn't convert response chunk-meta header to string: {0}")] + MetaHeaderToString(reqwest::header::ToStrError), + + /// Error parsing JSON. + #[error("failed to parse JSON: {0}")] + JsonParse(serde_json::Error), + + /// An error creating chunk directory. + #[error("Failed to create chunk directory {0}")] + ChunkMkdir(PathBuf, #[source] std::io::Error), + + /// An error writing a chunk file. + #[error("Failed to write chunk {0}")] + WriteChunk(PathBuf, #[source] std::io::Error), + + /// An error reading a chunk file. + #[error("Failed to read chunk {0}")] + ReadChunk(PathBuf, #[source] std::io::Error), + + /// No chunk id for uploaded chunk. + #[error("Server response claimed it had created a chunk, but lacked chunk id")] + NoCreatedChunkId, +} diff --git a/src/lib.rs b/src/lib.rs index fbbea15..a54c5ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod chunk; pub mod chunker; pub mod chunkid; pub mod chunkmeta; +pub mod chunkstore; pub mod cipher; pub mod client; pub mod cmd; -- cgit v1.2.1