1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
//! The `chunkify` subcommand.
use crate::config::ClientConfig;
use crate::engine::Engine;
use crate::error::ObnamError;
use crate::workqueue::WorkQueue;
use serde::Serialize;
use sha2::{Digest, Sha256};
use std::path::PathBuf;
use structopt::StructOpt;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
// Size of queue with unprocessed chunks, and also queue of computed
// checksums.
const Q: usize = 8;
/// Split files into chunks and show their metadata.
#[derive(Debug, StructOpt)]
pub struct Chunkify {
/// Names of files to split into chunks.
filenames: Vec<PathBuf>,
}
impl Chunkify {
/// Run the command.
pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
let rt = Runtime::new()?;
rt.block_on(self.run_async(config))
}
async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
let mut q = WorkQueue::new(Q);
for filename in self.filenames.iter() {
tokio::spawn(split_file(
filename.to_path_buf(),
config.chunk_size,
q.push(),
));
}
q.close();
let mut summer = Engine::new(q, just_hash);
let mut checksums = vec![];
while let Some(sum) = summer.next().await {
checksums.push(sum);
}
println!("{}", serde_json::to_string_pretty(&checksums)?);
Ok(())
}
}
#[derive(Debug, Clone)]
struct Chunk {
filename: PathBuf,
offset: u64,
data: Vec<u8>,
}
#[derive(Debug, Clone, Serialize)]
struct Checksum {
filename: PathBuf,
offset: u64,
pub len: u64,
checksum: String,
}
async fn split_file(filename: PathBuf, chunk_size: usize, tx: mpsc::Sender<Chunk>) {
// println!("split_file {}", filename.display());
let mut file = BufReader::new(File::open(&*filename).await.unwrap());
let mut offset = 0;
loop {
let mut data = vec![0; chunk_size];
let n = file.read(&mut data).await.unwrap();
if n == 0 {
break;
}
let data: Vec<u8> = data[..n].to_vec();
let chunk = Chunk {
filename: filename.clone(),
offset,
data,
};
tx.send(chunk).await.unwrap();
// println!("split_file sent chunk at offset {}", offset);
offset += n as u64;
}
// println!("split_file EOF at {}", offset);
}
fn just_hash(chunk: Chunk) -> Checksum {
let mut hasher = Sha256::new();
hasher.update(&chunk.data);
let hash = hasher.finalize();
let hash = format!("{:x}", hash);
Checksum {
filename: chunk.filename,
offset: chunk.offset,
len: chunk.data.len() as u64,
checksum: hash,
}
}
|