summaryrefslogtreecommitdiff
path: root/src/engine.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/engine.rs')
-rw-r--r--src/engine.rs123
1 files changed, 123 insertions, 0 deletions
diff --git a/src/engine.rs b/src/engine.rs
new file mode 100644
index 0000000..d35281b
--- /dev/null
+++ b/src/engine.rs
@@ -0,0 +1,123 @@
+//! Engine for doing CPU heavy work in the background.
+
+use crate::workqueue::WorkQueue;
+use futures::stream::{FuturesOrdered, StreamExt};
+use tokio::select;
+use tokio::sync::mpsc;
+
+/// Do heavy work in the background.
+///
+/// An engine takes items of work from a work queue, and does the work
+/// in the background, using `tokio` blocking tasks. The background
+/// work can be CPU intensive or block on I/O. The number of active
+/// concurrent tasks is limited to the size of the queue.
+///
+/// The actual work is done in a function or closure passed in as a
+/// parameter to the engine. The worker function is called with a work
+/// item as an argument, in a thread dedicated for that worker
+/// function.
+///
+/// The need to move work items between threads puts some restrictions
+/// on the types used as work items.
+pub struct Engine<T> {
+ rx: mpsc::Receiver<T>,
+}
+
+impl<T: Send + 'static> Engine<T> {
+ /// Create a new engine.
+ ///
+ /// Each engine gets work from a queue, and calls the same worker
+ /// function for each item of work. The results are put into
+ /// another, internal queue.
+ pub fn new<S, F>(queue: WorkQueue<S>, func: F) -> Self
+ where
+ F: Send + Copy + 'static + Fn(S) -> T,
+ S: Send + 'static,
+ {
+ let size = queue.size();
+ let (tx, rx) = mpsc::channel(size);
+ tokio::spawn(manage_workers(queue, size, tx, func));
+ Self { rx }
+ }
+
+ /// Get the oldest result of the worker function, if any.
+ ///
+ /// This will block until there is a result, or it's known that no
+ /// more results will be forthcoming.
+ pub async fn next(&mut self) -> Option<T> {
+ self.rx.recv().await
+ }
+}
+
+// This is a normal (non-blocking) background task that retrieves work
+// items, launches blocking background tasks for work to be done, and
+// waits on those tasks. Care is taken to not launch too many worker
+// tasks.
+async fn manage_workers<S, T, F>(
+ mut queue: WorkQueue<S>,
+ queue_size: usize,
+ tx: mpsc::Sender<T>,
+ func: F,
+) where
+ F: Send + 'static + Copy + Fn(S) -> T,
+ S: Send + 'static,
+ T: Send + 'static,
+{
+ let mut workers = FuturesOrdered::new();
+
+ 'processing: loop {
+ // Wait for first of various concurrent things to finish.
+ select! {
+ biased;
+
+ // Get work to be done.
+ maybe_work = queue.next() => {
+ if let Some(work) = maybe_work {
+ // We got a work item. Launch background task to
+ // work on it.
+ let tx = tx.clone();
+ workers.push_back(do_work(work, tx, func));
+
+ // If queue is full, wait for at least one
+ // background task to finish.
+ while workers.len() >= queue_size {
+ workers.next().await;
+ }
+ } else {
+ // Finished with the input queue. Nothing more to do.
+ break 'processing;
+ }
+ }
+
+ // Wait for background task to finish, if there are any
+ // background tasks currently running.
+ _ = workers.next(), if !workers.is_empty() => {
+ // nothing to do here
+ }
+ }
+ }
+
+ while workers.next().await.is_some() {
+ // Finish the remaining work items.
+ }
+}
+
+// Work on a work item.
+//
+// This launches a `tokio` blocking background task, and waits for it
+// to finish. The caller spawns a normal (non-blocking) async task for
+// this function, so it's OK for this function to wait on the task it
+// launches.
+async fn do_work<S, T, F>(item: S, tx: mpsc::Sender<T>, func: F)
+where
+ F: Send + 'static + Fn(S) -> T,
+ S: Send + 'static,
+ T: Send + 'static,
+{
+ let result = tokio::task::spawn_blocking(move || func(item))
+ .await
+ .unwrap();
+ if let Err(err) = tx.send(result).await {
+ panic!("failed to send result to channel: {}", err);
+ }
+}