diff options
author | Lars Wirzenius <liw@liw.fi> | 2021-07-14 15:59:56 +0300 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2021-07-18 10:02:33 +0300 |
commit | 5d016c96c8b2b3290d25baa838c8816edeecab8a (patch) | |
tree | 9779e63d2a26423386fbe64b518d3c9b01f972ae /src/cmd | |
parent | b2a3a0999d6085d20419d9794b89ab9b0ada0124 (diff) | |
download | obnam2-5d016c96c8b2b3290d25baa838c8816edeecab8a.tar.gz |
feat: add "chunkify" subcommand
Sponsored-by: author
Diffstat (limited to 'src/cmd')
-rw-r--r-- | src/cmd/chunkify.rs | 104 | ||||
-rw-r--r-- | src/cmd/mod.rs | 1 |
2 files changed, 105 insertions, 0 deletions
diff --git a/src/cmd/chunkify.rs b/src/cmd/chunkify.rs new file mode 100644 index 0000000..10121a9 --- /dev/null +++ b/src/cmd/chunkify.rs @@ -0,0 +1,104 @@ +use crate::config::ClientConfig; +use crate::engine::Engine; +use crate::error::ObnamError; +use crate::workqueue::WorkQueue; +use futures::executor::block_on; +use serde::Serialize; +use sha2::{Digest, Sha256}; +use std::path::PathBuf; +use structopt::StructOpt; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, BufReader}; +use tokio::sync::mpsc; + +// Size of queue with unprocessed chunks, and also queue of computed +// checksums. +const Q: usize = 8; + +#[derive(Debug, StructOpt)] +pub struct Chunkify { + filenames: Vec<PathBuf>, +} + +impl Chunkify { + pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> { + block_on(self.run_async(config)) + } + + pub async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> { + let mut q = WorkQueue::new(Q); + for filename in self.filenames.iter() { + tokio::spawn(split_file( + filename.to_path_buf(), + config.chunk_size, + q.push(), + )); + } + q.close(); + + let mut summer = Engine::new(q, just_hash); + + let mut checksums = vec![]; + while let Some(sum) = summer.next().await { + checksums.push(sum); + } + + println!("{}", serde_json::to_string_pretty(&checksums)?); + + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct Chunk { + filename: PathBuf, + offset: u64, + data: Vec<u8>, +} + +#[derive(Debug, Clone, Serialize)] +pub struct Checksum { + filename: PathBuf, + offset: u64, + pub len: u64, + checksum: String, +} + +pub async fn split_file(filename: PathBuf, chunk_size: usize, tx: mpsc::Sender<Chunk>) { + // println!("split_file {}", filename.display()); + let mut file = BufReader::new(File::open(&*filename).await.unwrap()); + + let mut offset = 0; + loop { + let mut data = vec![0; chunk_size]; + let n = file.read(&mut data).await.unwrap(); + if n == 0 { + break; + } + let data: Vec<u8> = data[..n].to_vec(); + + let chunk = Chunk { + filename: filename.clone(), + offset, + data, + }; + tx.send(chunk).await.unwrap(); + // println!("split_file sent chunk at offset {}", offset); + + offset += n as u64; + } + // println!("split_file EOF at {}", offset); +} + +fn just_hash(chunk: Chunk) -> Checksum { + let mut hasher = Sha256::new(); + hasher.update(&chunk.data); + let hash = hasher.finalize(); + let hash = format!("{:x}", hash); + Checksum { + filename: chunk.filename, + offset: chunk.offset, + len: chunk.data.len() as u64, + checksum: hash, + } +} diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index bd101da..7313a05 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -1,5 +1,6 @@ pub mod backup; pub mod chunk; +pub mod chunkify; pub mod get_chunk; pub mod init; pub mod list; |