diff options
author | Lars Wirzenius <liw@liw.fi> | 2021-07-05 19:36:51 +0300 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2021-07-18 10:02:33 +0300 |
commit | b2a3a0999d6085d20419d9794b89ab9b0ada0124 (patch) | |
tree | 8bcea6bed936105b14da7ee8aca0f19949f73992 | |
parent | b7fd8b09a1cef3b2044b8d459d9c58a79fc12830 (diff) | |
download | obnam2-b2a3a0999d6085d20419d9794b89ab9b0ada0124.tar.gz |
feat: add Engine and WorkQueue abstractions for async processing
Many thanks to Daniel Silverstone for helping me get through this.
Sponsored-by: author
-rw-r--r-- | Cargo.lock | 61 | ||||
-rw-r--r-- | Cargo.toml | 4 | ||||
-rw-r--r-- | src/engine.rs | 121 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/workqueue.rs | 60 |
5 files changed, 247 insertions, 1 deletions
@@ -388,6 +388,7 @@ checksum = "0e7e43a803dae2fa37c1f6a8fe121e1f7bf9548b4dfc0522a42f34145dadfc27" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -411,12 +412,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1" [[package]] +name = "futures-executor" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "badaa6a909fac9e7236d0620a2f57f7664640c56575b71a7552fbd68deafab79" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] name = "futures-io" version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acc499defb3b348f8d8f3f66415835a9131856ff7714bf10dadfc4ec4bdb29a1" [[package]] +name = "futures-macro" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c40298486cdf52cc00cd6d6987892ba502c7656a16a4192a9992b1ccedd121" +dependencies = [ + "autocfg", + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "futures-sink" version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -435,13 +460,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "feb5c238d27e2bf94ffdfd27b2c29e3df4a68c4193bb6427384259e2bf191967" dependencies = [ "autocfg", + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", "pin-project-lite", "pin-utils", + "proc-macro-hack", + "proc-macro-nested", "slab", ] @@ -960,6 +989,7 @@ dependencies = [ "bytesize", "chrono", "directories-next", + "futures", "indicatif", "libc", "log", @@ -974,6 +1004,7 @@ dependencies = [ "serde_json", "serde_yaml", "sha2", + "spmc", "structopt", "tempfile", "thiserror", @@ -1184,6 +1215,18 @@ dependencies = [ ] [[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" + +[[package]] name = "proc-macro2" version = "1.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1595,6 +1638,15 @@ dependencies = [ ] [[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + +[[package]] name = "slab" version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1623,6 +1675,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] +name = "spmc" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02a8428da277a8e3a15271d79943e80ccc2ef254e78813a166a08d65e4c3ece5" + +[[package]] name = "strsim" version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1779,7 +1837,10 @@ dependencies = [ "memchr", "mio", "num_cpus", + "once_cell", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "tokio-macros", "winapi", ] @@ -16,6 +16,7 @@ anyhow = "1" bytesize = "1" chrono = "0.4" directories-next = "2" +futures = "0.3.15" indicatif = "0.16" libc = "0.2" log = "0.4" @@ -30,10 +31,11 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" serde_yaml = "0.8" sha2 = "0.9" +spmc = "0.3.0" structopt = "0.3" tempfile = "3" thiserror = "1" -tokio = { version = "1", features = ["macros"] } +tokio = { version = "1", features = ["full"] } users = "0.11" uuid = { version = "0.8", features = ["v4"] } walkdir = "2" diff --git a/src/engine.rs b/src/engine.rs new file mode 100644 index 0000000..6b8307d --- /dev/null +++ b/src/engine.rs @@ -0,0 +1,121 @@ +use crate::workqueue::WorkQueue; +use futures::stream::{FuturesOrdered, StreamExt}; +use tokio::select; +use tokio::sync::mpsc; + +/// Do heavy work in the background. +/// +/// An engine takes items of work from a work queue, and does the work +/// in the background, using `tokio` blocking tasks. The background +/// work can be CPU intensive or block on I/O. The number of active +/// concurrent tasks is limited to the size of the queue. +/// +/// The actual work is done in a function or closure passed in as a +/// parameter to the engine. The worker function is called with a work +/// item as an argument, in a thread dedicated for that worker +/// function. +/// +/// The need to move work items between threads puts some restrictions +/// on the types used as work items. +pub struct Engine<T> { + rx: mpsc::Receiver<T>, +} + +impl<T: Send + 'static> Engine<T> { + /// Create a new engine. + /// + /// Each engine gets work from a queue, and calls the same worker + /// function for each item of work. The results are put into + /// another, internal queue. + pub fn new<S, F>(queue: WorkQueue<S>, func: F) -> Self + where + F: Send + Copy + 'static + Fn(S) -> T, + S: Send + 'static, + { + let size = queue.size(); + let (tx, rx) = mpsc::channel(size); + tokio::spawn(manage_workers(queue, size, tx, func)); + Self { rx } + } + + /// Get the oldest result worker function, if any. + /// + /// This will block until there is a result, or it's known that no + /// more results will be forthcoming. + pub async fn next(&mut self) -> Option<T> { + self.rx.recv().await + } +} + +// This is a normal (non-blocking) background task that retrieves work +// items, launches blocking background tasks for work to be done, and +// waits on those tasks. Care is taken to not launch too many worker +// tasks. +async fn manage_workers<S, T, F>( + mut queue: WorkQueue<S>, + queue_size: usize, + tx: mpsc::Sender<T>, + func: F, +) where + F: Send + 'static + Copy + Fn(S) -> T, + S: Send + 'static, + T: Send + 'static, +{ + let mut workers = FuturesOrdered::new(); + + 'processing: loop { + // Wait for first of various concurrent things to finish. + select! { + biased; + + // Get work to be done. + maybe_work = queue.next() => { + if let Some(work) = maybe_work { + // We got a work item. Launch background task to + // work on it. + let tx = tx.clone(); + workers.push(do_work(work, tx, func)); + + // If queue is full, wait for at least one + // background task to finish. + while workers.len() >= queue_size { + workers.next().await; + } + } else { + // Finished with the input queue. Nothing more to do. + break 'processing; + } + } + + // Wait for background task to finish, if there are any + // background tasks currently running. + _ = workers.next(), if !workers.is_empty() => { + // nothing to do here + } + } + } + + while workers.next().await.is_some() { + // Finish the chunks + } +} + +// Work on a work item. +// +// This launches a `tokio` blocking background task, and waits for it +// to finish. The caller spawns a normal (non-blocking) async task for +// this function, so it's OK for this function to wait on the task it +// launches. +async fn do_work<S, T, F>(item: S, tx: mpsc::Sender<T>, func: F) +where + F: Send + 'static + Fn(S) -> T, + S: Send + 'static, + T: Send + 'static, +{ + let result = tokio::task::spawn_blocking(move || func(item)) + .await + .unwrap(); + if let Err(err) = tx.send(result).await { + panic!("failed to send result to channel: {}", err); + } +} @@ -11,6 +11,7 @@ pub mod cipher; pub mod client; pub mod cmd; pub mod config; +pub mod engine; pub mod error; pub mod fsentry; pub mod fsiter; @@ -22,3 +23,4 @@ pub mod passwords; pub mod policy; pub mod server; pub mod store; +pub mod workqueue; diff --git a/src/workqueue.rs b/src/workqueue.rs new file mode 100644 index 0000000..44ba5e4 --- /dev/null +++ b/src/workqueue.rs @@ -0,0 +1,60 @@ +use tokio::sync::mpsc; + +/// A queue of work items. +/// +/// An abstraction for producing items of work. For example, chunks of +/// data in a file. The work items are put into an ordered queue to be +/// worked on by another task. The queue is limited in size so that it +/// doesn't grow impossibly large. This acts as a load-limiting +/// synchronizing mechanism. +/// +/// One async task produces work items and puts them into the queue, +/// another consumes them from the queue. If the producer is too fast, +/// the queue fills up, and the producer blocks when putting an item +/// into the queue. If the queue is empty, the consumer blocks until +/// there is something added to the queue. +/// +/// The work items need to be abstracted as a type, and that type is +/// given as a type parameter. +pub struct WorkQueue<T> { + rx: mpsc::Receiver<T>, + tx: Option<mpsc::Sender<T>>, + size: usize, +} + +impl<T> WorkQueue<T> { + /// Create a new work queue of a given maximum size. + pub fn new(queue_size: usize) -> Self { + let (tx, rx) = mpsc::channel(queue_size); + Self { + rx, + tx: Some(tx), + size: queue_size, + } + } + + /// Get maximum size of queue. + pub fn size(&self) -> usize { + self.size + } + + /// Add an item of work to the queue. + pub fn push(&self) -> mpsc::Sender<T> { + self.tx.as_ref().unwrap().clone() + } + + /// Signal that no more work items will be added to the queue. + /// + /// You **must** call this, as otherwise the `next` function will + /// wait indefinitely. + pub fn close(&mut self) { + // println!("Chunkify::close closing sender"); + self.tx = None; + } + + /// Get the oldest work item from the queue, if any. + pub async fn next(&mut self) -> Option<T> { + // println!("next called"); + self.rx.recv().await + } +} |