summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2021-07-05 19:36:51 +0300
committerLars Wirzenius <liw@liw.fi>2021-07-18 10:02:33 +0300
commitb2a3a0999d6085d20419d9794b89ab9b0ada0124 (patch)
tree8bcea6bed936105b14da7ee8aca0f19949f73992
parentb7fd8b09a1cef3b2044b8d459d9c58a79fc12830 (diff)
downloadobnam2-b2a3a0999d6085d20419d9794b89ab9b0ada0124.tar.gz
feat: add Engine and WorkQueue abstractions for async processing
Many thanks to Daniel Silverstone for helping me get through this. Sponsored-by: author
-rw-r--r--Cargo.lock61
-rw-r--r--Cargo.toml4
-rw-r--r--src/engine.rs121
-rw-r--r--src/lib.rs2
-rw-r--r--src/workqueue.rs60
5 files changed, 247 insertions, 1 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 5f010da..4657bb7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -388,6 +388,7 @@ checksum = "0e7e43a803dae2fa37c1f6a8fe121e1f7bf9548b4dfc0522a42f34145dadfc27"
dependencies = [
"futures-channel",
"futures-core",
+ "futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@@ -411,12 +412,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1"
[[package]]
+name = "futures-executor"
+version = "0.3.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "badaa6a909fac9e7236d0620a2f57f7664640c56575b71a7552fbd68deafab79"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
name = "futures-io"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acc499defb3b348f8d8f3f66415835a9131856ff7714bf10dadfc4ec4bdb29a1"
[[package]]
+name = "futures-macro"
+version = "0.3.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4c40298486cdf52cc00cd6d6987892ba502c7656a16a4192a9992b1ccedd121"
+dependencies = [
+ "autocfg",
+ "proc-macro-hack",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "futures-sink"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -435,13 +460,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feb5c238d27e2bf94ffdfd27b2c29e3df4a68c4193bb6427384259e2bf191967"
dependencies = [
"autocfg",
+ "futures-channel",
"futures-core",
"futures-io",
+ "futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
+ "proc-macro-hack",
+ "proc-macro-nested",
"slab",
]
@@ -960,6 +989,7 @@ dependencies = [
"bytesize",
"chrono",
"directories-next",
+ "futures",
"indicatif",
"libc",
"log",
@@ -974,6 +1004,7 @@ dependencies = [
"serde_json",
"serde_yaml",
"sha2",
+ "spmc",
"structopt",
"tempfile",
"thiserror",
@@ -1184,6 +1215,18 @@ dependencies = [
]
[[package]]
+name = "proc-macro-hack"
+version = "0.5.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
+
+[[package]]
+name = "proc-macro-nested"
+version = "0.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
+
+[[package]]
name = "proc-macro2"
version = "1.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1595,6 +1638,15 @@ dependencies = [
]
[[package]]
+name = "signal-hook-registry"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
+dependencies = [
+ "libc",
+]
+
+[[package]]
name = "slab"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1623,6 +1675,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
+name = "spmc"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "02a8428da277a8e3a15271d79943e80ccc2ef254e78813a166a08d65e4c3ece5"
+
+[[package]]
name = "strsim"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1779,7 +1837,10 @@ dependencies = [
"memchr",
"mio",
"num_cpus",
+ "once_cell",
+ "parking_lot",
"pin-project-lite",
+ "signal-hook-registry",
"tokio-macros",
"winapi",
]
diff --git a/Cargo.toml b/Cargo.toml
index ceba834..31e3757 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,6 +16,7 @@ anyhow = "1"
bytesize = "1"
chrono = "0.4"
directories-next = "2"
+futures = "0.3.15"
indicatif = "0.16"
libc = "0.2"
log = "0.4"
@@ -30,10 +31,11 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_yaml = "0.8"
sha2 = "0.9"
+spmc = "0.3.0"
structopt = "0.3"
tempfile = "3"
thiserror = "1"
-tokio = { version = "1", features = ["macros"] }
+tokio = { version = "1", features = ["full"] }
users = "0.11"
uuid = { version = "0.8", features = ["v4"] }
walkdir = "2"
diff --git a/src/engine.rs b/src/engine.rs
new file mode 100644
index 0000000..6b8307d
--- /dev/null
+++ b/src/engine.rs
@@ -0,0 +1,121 @@
+use crate::workqueue::WorkQueue;
+use futures::stream::{FuturesOrdered, StreamExt};
+use tokio::select;
+use tokio::sync::mpsc;
+
+/// Do heavy work in the background.
+///
+/// An engine takes items of work from a work queue, and does the work
+/// in the background, using `tokio` blocking tasks. The background
+/// work can be CPU intensive or block on I/O. The number of active
+/// concurrent tasks is limited to the size of the queue.
+///
+/// The actual work is done in a function or closure passed in as a
+/// parameter to the engine. The worker function is called with a work
+/// item as an argument, in a thread dedicated for that worker
+/// function.
+///
+/// The need to move work items between threads puts some restrictions
+/// on the types used as work items.
+pub struct Engine<T> {
+ rx: mpsc::Receiver<T>,
+}
+
+impl<T: Send + 'static> Engine<T> {
+ /// Create a new engine.
+ ///
+ /// Each engine gets work from a queue, and calls the same worker
+ /// function for each item of work. The results are put into
+ /// another, internal queue.
+ pub fn new<S, F>(queue: WorkQueue<S>, func: F) -> Self
+ where
+ F: Send + Copy + 'static + Fn(S) -> T,
+ S: Send + 'static,
+ {
+ let size = queue.size();
+ let (tx, rx) = mpsc::channel(size);
+ tokio::spawn(manage_workers(queue, size, tx, func));
+ Self { rx }
+ }
+
+ /// Get the oldest result worker function, if any.
+ ///
+ /// This will block until there is a result, or it's known that no
+ /// more results will be forthcoming.
+ pub async fn next(&mut self) -> Option<T> {
+ self.rx.recv().await
+ }
+}
+
+// This is a normal (non-blocking) background task that retrieves work
+// items, launches blocking background tasks for work to be done, and
+// waits on those tasks. Care is taken to not launch too many worker
+// tasks.
+async fn manage_workers<S, T, F>(
+ mut queue: WorkQueue<S>,
+ queue_size: usize,
+ tx: mpsc::Sender<T>,
+ func: F,
+) where
+ F: Send + 'static + Copy + Fn(S) -> T,
+ S: Send + 'static,
+ T: Send + 'static,
+{
+ let mut workers = FuturesOrdered::new();
+
+ 'processing: loop {
+ // Wait for first of various concurrent things to finish.
+ select! {
+ biased;
+
+ // Get work to be done.
+ maybe_work = queue.next() => {
+ if let Some(work) = maybe_work {
+ // We got a work item. Launch background task to
+ // work on it.
+ let tx = tx.clone();
+ workers.push(do_work(work, tx, func));
+
+ // If queue is full, wait for at least one
+ // background task to finish.
+ while workers.len() >= queue_size {
+ workers.next().await;
+ }
+ } else {
+ // Finished with the input queue. Nothing more to do.
+ break 'processing;
+ }
+ }
+
+ // Wait for background task to finish, if there are any
+ // background tasks currently running.
+ _ = workers.next(), if !workers.is_empty() => {
+ // nothing to do here
+ }
+ }
+ }
+
+ while workers.next().await.is_some() {
+ // Finish the chunks
+ }
+}
+
+// Work on a work item.
+//
+// This launches a `tokio` blocking background task, and waits for it
+// to finish. The caller spawns a normal (non-blocking) async task for
+// this function, so it's OK for this function to wait on the task it
+// launches.
+async fn do_work<S, T, F>(item: S, tx: mpsc::Sender<T>, func: F)
+where
+ F: Send + 'static + Fn(S) -> T,
+ S: Send + 'static,
+ T: Send + 'static,
+{
+ let result = tokio::task::spawn_blocking(move || func(item))
+ .await
+ .unwrap();
+ if let Err(err) = tx.send(result).await {
+ panic!("failed to send result to channel: {}", err);
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 7d7afdc..3e378f6 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -11,6 +11,7 @@ pub mod cipher;
pub mod client;
pub mod cmd;
pub mod config;
+pub mod engine;
pub mod error;
pub mod fsentry;
pub mod fsiter;
@@ -22,3 +23,4 @@ pub mod passwords;
pub mod policy;
pub mod server;
pub mod store;
+pub mod workqueue;
diff --git a/src/workqueue.rs b/src/workqueue.rs
new file mode 100644
index 0000000..44ba5e4
--- /dev/null
+++ b/src/workqueue.rs
@@ -0,0 +1,60 @@
+use tokio::sync::mpsc;
+
+/// A queue of work items.
+///
+/// An abstraction for producing items of work. For example, chunks of
+/// data in a file. The work items are put into an ordered queue to be
+/// worked on by another task. The queue is limited in size so that it
+/// doesn't grow impossibly large. This acts as a load-limiting
+/// synchronizing mechanism.
+///
+/// One async task produces work items and puts them into the queue,
+/// another consumes them from the queue. If the producer is too fast,
+/// the queue fills up, and the producer blocks when putting an item
+/// into the queue. If the queue is empty, the consumer blocks until
+/// there is something added to the queue.
+///
+/// The work items need to be abstracted as a type, and that type is
+/// given as a type parameter.
+pub struct WorkQueue<T> {
+ rx: mpsc::Receiver<T>,
+ tx: Option<mpsc::Sender<T>>,
+ size: usize,
+}
+
+impl<T> WorkQueue<T> {
+ /// Create a new work queue of a given maximum size.
+ pub fn new(queue_size: usize) -> Self {
+ let (tx, rx) = mpsc::channel(queue_size);
+ Self {
+ rx,
+ tx: Some(tx),
+ size: queue_size,
+ }
+ }
+
+ /// Get maximum size of queue.
+ pub fn size(&self) -> usize {
+ self.size
+ }
+
+ /// Add an item of work to the queue.
+ pub fn push(&self) -> mpsc::Sender<T> {
+ self.tx.as_ref().unwrap().clone()
+ }
+
+ /// Signal that no more work items will be added to the queue.
+ ///
+ /// You **must** call this, as otherwise the `next` function will
+ /// wait indefinitely.
+ pub fn close(&mut self) {
+ // println!("Chunkify::close closing sender");
+ self.tx = None;
+ }
+
+ /// Get the oldest work item from the queue, if any.
+ pub async fn next(&mut self) -> Option<T> {
+ // println!("next called");
+ self.rx.recv().await
+ }
+}