From fe5b650897f6715d40a29e72d821caf812f9edaa Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Fri, 23 Apr 2021 08:01:02 +0300 Subject: refactor: use async for concurrency Normal tokio async tasks for collecting metadata, blocking tasks for computing checksums, so that all available CPU can be used for that. --- Cargo.lock | 186 +++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 + src/bin/summain.rs | 52 ++++++++++++--- src/lib.rs | 28 +++++--- 4 files changed, 252 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1424ac1..a8b2a30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "autocfg" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" + [[package]] name = "bitflags" version = "1.2.1" @@ -41,6 +47,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bytes" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" + [[package]] name = "cfg-if" version = "1.0.0" @@ -111,6 +123,15 @@ dependencies = [ "libc", ] +[[package]] +name = "instant" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" +dependencies = [ + "cfg-if", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -129,12 +150,114 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" +[[package]] +name = "lock_api" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3c91c24eae6777794bb1997ad98bbb87daf92890acab859f7eaa4320333176" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" + +[[package]] +name = "mio" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf80d3e903b34e0bd7282b218398aec54e082c840d9baf8339e0080a0c542956" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi", +] + +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi", +] + +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3" + [[package]] name = "opaque-debug" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "parking_lot" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc0e1f259c92177c30a4c9d177246edd0a3568b25756a977d0632cf8fa37e905" + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -177,6 +300,21 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redox_syscall" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8270314b5ccceb518e7e578952f0b72b88222d02e8f77f5ecf7abbb673539041" +dependencies = [ + "bitflags", +] + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "serde" version = "1.0.123" @@ -222,6 +360,21 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "signal-hook-registry" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" + [[package]] name = "strsim" version = "0.8.0" @@ -258,10 +411,12 @@ version = "0.26.0" dependencies = [ "anyhow", "digest", + "num_cpus", "serde", "serde_yaml", "sha2", "structopt", + "tokio", "unix_mode", ] @@ -285,6 +440,37 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "tokio" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5" +dependencies = [ + "autocfg", + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "once_cell", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "typenum" version = "1.12.0" diff --git a/Cargo.toml b/Cargo.toml index e0403c4..444263f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,8 +13,10 @@ edition = "2018" [dependencies] anyhow = "1" digest = "0.9" +num_cpus = "1" serde = { version = "1", features = ["derive"] } serde_yaml = "0.8" sha2 = "0.9" structopt = "0.3" +tokio = { version = "1", features = ["full"] } unix_mode = "0.1" diff --git a/src/bin/summain.rs b/src/bin/summain.rs index 4336ebd..5031006 100644 --- a/src/bin/summain.rs +++ b/src/bin/summain.rs @@ -1,17 +1,51 @@ use anyhow::Context; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use structopt::StructOpt; use summain::ManifestEntry; -fn main() -> anyhow::Result<()> { +#[tokio::main] +async fn main() -> anyhow::Result<()> { let mut opt = Opt::from_args(); opt.pathnames[..].sort(); - let v: Vec> = - opt.pathnames.iter().map(|p| manifest(&p)).collect(); - for m in v { - let m = m?; + + // Get metadata about all files, but don't compute checksum yet. + // + // This all runs in async worker threads, since it's theoretically + // I/O heavy and benefits from async task context switching. We + // create a task for each named file, since tasks are very cheap. + // We keep the JoinHandle for each task in a vector: there's not + // going to be so many handles that the vector becomes impossibly + // large: Linux only allows 128 KiB of command line arguments. + + let mut handles = vec![]; + for filename in opt.pathnames.iter().cloned() { + handles.push(tokio::spawn(async move { manifest(filename) })); + } + + // Compute checksums for regular files. + // + // This runs in blocking threads, since it's CPU heavy. We create + // another vector of JoinHandles. Again, it won't become too large. + + let mut sumhandles = vec![]; + for h in handles { + let mut m: ManifestEntry = h.await?.await?; + let h = tokio::task::spawn_blocking(move || match m.compute_checksum() { + Err(e) => Err(e), + Ok(_) => Ok(m), + }); + sumhandles.push(h) + } + + // Wait for checksums to be available and print manifest. + // + // Note how this iterates over the results in the right order. + + for h in sumhandles { + let m: ManifestEntry = h.await??; print!("{}", serde_yaml::to_string(&m)?); } + Ok(()) } @@ -21,6 +55,8 @@ struct Opt { pathnames: Vec, } -fn manifest(path: &Path) -> anyhow::Result { - ManifestEntry::new(path).with_context(|| format!("{}", path.display())) +async fn manifest(path: PathBuf) -> anyhow::Result { + ManifestEntry::new(&path) + .await + .with_context(|| format!("{}", path.display())) } diff --git a/src/lib.rs b/src/lib.rs index 3c90156..7075477 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,14 @@ const BUF_SIZE: usize = 1024 * 1024; /// An entry in a file manifest. #[derive(Serialize, Debug)] pub struct ManifestEntry { + #[serde(skip)] + is_regular: bool, + + // Store the original name in a hidden field, for compute_checksum. + #[serde(skip)] + filename: PathBuf, + + // We store pathname as a string so that we can handle non-UTF8 names. path: String, #[serde(with = "mode")] mode: u32, @@ -60,29 +68,33 @@ impl ManifestEntry { /// caller. This function doesn't query the system for it. /// /// The structure can be serialized using serde. - pub fn new(path: &Path) -> std::io::Result { + pub async fn new(path: &Path) -> std::io::Result { let m = symlink_metadata(path)?; - let hash = if m.is_file() { - Some(file_checksum(path)?) - } else { - None - }; let target = if m.file_type().is_symlink() { Some(read_link(path)?) } else { None }; Ok(Self { + is_regular: m.is_file(), + filename: path.to_path_buf(), path: path.to_string_lossy().into_owned(), mode: m.st_mode(), mtime: m.st_mtime(), mtime_nsec: m.st_mtime_nsec(), nlink: m.st_nlink(), size: if m.is_dir() { None } else { Some(m.st_size()) }, - sha256: hash, + sha256: None, target, }) } + + pub fn compute_checksum(&mut self) -> std::io::Result<()> { + if self.is_regular { + self.sha256 = Some(file_checksum(&self.filename)?); + } + Ok(()) + } } fn file_checksum(path: &Path) -> std::io::Result { @@ -90,8 +102,8 @@ fn file_checksum(path: &Path) -> std::io::Result { let file = File::open(path)?; let mut reader = BufReader::new(file); - let mut buf = vec![0; BUF_SIZE]; loop { + let mut buf = vec![0; BUF_SIZE]; let n = reader.read(&mut buf)?; if n == 0 { break; -- cgit v1.2.1