summaryrefslogtreecommitdiff
path: root/src/cmd/chunkify.rs
blob: e2ce05ffe0eded5cfbaabf406c441e4fa0e5fc31 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
//! The `chunkify` subcommand.

use crate::config::ClientConfig;
use crate::engine::Engine;
use crate::error::ObnamError;
use crate::workqueue::WorkQueue;
use serde::Serialize;
use sha2::{Digest, Sha256};
use std::path::PathBuf;
use structopt::StructOpt;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, BufReader};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

// Size of queue with unprocessed chunks, and also queue of computed
// checksums.
const Q: usize = 8;

/// Split files into chunks and show their metadata.
#[derive(Debug, StructOpt)]
pub struct Chunkify {
    /// Names of files to split into chunks.
    filenames: Vec<PathBuf>,
}

impl Chunkify {
    /// Run the command.
    pub fn run(&self, config: &ClientConfig) -> Result<(), ObnamError> {
        let rt = Runtime::new()?;
        rt.block_on(self.run_async(config))
    }

    async fn run_async(&self, config: &ClientConfig) -> Result<(), ObnamError> {
        let mut q = WorkQueue::new(Q);
        for filename in self.filenames.iter() {
            tokio::spawn(split_file(
                filename.to_path_buf(),
                config.chunk_size,
                q.push(),
            ));
        }
        q.close();

        let mut summer = Engine::new(q, just_hash);

        let mut checksums = vec![];
        while let Some(sum) = summer.next().await {
            checksums.push(sum);
        }

        println!("{}", serde_json::to_string_pretty(&checksums)?);

        Ok(())
    }
}

#[derive(Debug, Clone)]
struct Chunk {
    filename: PathBuf,
    offset: u64,
    data: Vec<u8>,
}

#[derive(Debug, Clone, Serialize)]
struct Checksum {
    filename: PathBuf,
    offset: u64,
    pub len: u64,
    checksum: String,
}

async fn split_file(filename: PathBuf, chunk_size: usize, tx: mpsc::Sender<Chunk>) {
    // println!("split_file {}", filename.display());
    let mut file = BufReader::new(File::open(&*filename).await.unwrap());

    let mut offset = 0;
    loop {
        let mut data = vec![0; chunk_size];
        let n = file.read(&mut data).await.unwrap();
        if n == 0 {
            break;
        }
        let data: Vec<u8> = data[..n].to_vec();

        let chunk = Chunk {
            filename: filename.clone(),
            offset,
            data,
        };
        tx.send(chunk).await.unwrap();
        // println!("split_file sent chunk at offset {}", offset);

        offset += n as u64;
    }
    // println!("split_file EOF at {}", offset);
}

fn just_hash(chunk: Chunk) -> Checksum {
    let mut hasher = Sha256::new();
    hasher.update(&chunk.data);
    let hash = hasher.finalize();
    let hash = format!("{:x}", hash);
    Checksum {
        filename: chunk.filename,
        offset: chunk.offset,
        len: chunk.data.len() as u64,
        checksum: hash,
    }
}