summaryrefslogtreecommitdiff
path: root/src/engine.rs
blob: d35281bcba2d6c38199835ab2d4f5c129cad188c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//! Engine for doing CPU heavy work in the background.

use crate::workqueue::WorkQueue;
use futures::stream::{FuturesOrdered, StreamExt};
use tokio::select;
use tokio::sync::mpsc;

/// Do heavy work in the background.
///
/// An engine takes items of work from a work queue, and does the work
/// in the background, using `tokio` blocking tasks. The background
/// work can be CPU intensive or block on I/O. The number of active
/// concurrent tasks is limited to the size of the queue.
///
/// The actual work is done in a function or closure passed in as a
/// parameter to the engine. The worker function is called with a work
/// item as an argument, in a thread dedicated for that worker
/// function.
///
/// The need to move work items between threads puts some restrictions
/// on the types used as work items.
pub struct Engine<T> {
    rx: mpsc::Receiver<T>,
}

impl<T: Send + 'static> Engine<T> {
    /// Create a new engine.
    ///
    /// Each engine gets work from a queue, and calls the same worker
    /// function for each item of work. The results are put into
    /// another, internal queue.
    pub fn new<S, F>(queue: WorkQueue<S>, func: F) -> Self
    where
        F: Send + Copy + 'static + Fn(S) -> T,
        S: Send + 'static,
    {
        let size = queue.size();
        let (tx, rx) = mpsc::channel(size);
        tokio::spawn(manage_workers(queue, size, tx, func));
        Self { rx }
    }

    /// Get the oldest result of the worker function, if any.
    ///
    /// This will block until there is a result, or it's known that no
    /// more results will be forthcoming.
    pub async fn next(&mut self) -> Option<T> {
        self.rx.recv().await
    }
}

// This is a normal (non-blocking) background task that retrieves work
// items, launches blocking background tasks for work to be done, and
// waits on those tasks. Care is taken to not launch too many worker
// tasks.
async fn manage_workers<S, T, F>(
    mut queue: WorkQueue<S>,
    queue_size: usize,
    tx: mpsc::Sender<T>,
    func: F,
) where
    F: Send + 'static + Copy + Fn(S) -> T,
    S: Send + 'static,
    T: Send + 'static,
{
    let mut workers = FuturesOrdered::new();

    'processing: loop {
        // Wait for first of various concurrent things to finish.
        select! {
            biased;

            // Get work to be done.
            maybe_work = queue.next() => {
                if let Some(work) = maybe_work {
                    // We got a work item. Launch background task to
                    // work on it.
                    let tx = tx.clone();
                    workers.push_back(do_work(work, tx, func));

                    // If queue is full, wait for at least one
                    // background task to finish.
                    while workers.len() >= queue_size {
                        workers.next().await;
                    }
                } else {
                    // Finished with the input queue. Nothing more to do.
                    break 'processing;
                }
            }

            // Wait for background task to finish, if there are any
            // background tasks currently running.
            _ = workers.next(), if !workers.is_empty() => {
                // nothing to do here
            }
        }
    }

    while workers.next().await.is_some() {
        // Finish the remaining work items.
    }
}

// Work on a work item.
//
// This launches a `tokio` blocking background task, and waits for it
// to finish. The caller spawns a normal (non-blocking) async task for
// this function, so it's OK for this function to wait on the task it
// launches.
async fn do_work<S, T, F>(item: S, tx: mpsc::Sender<T>, func: F)
where
    F: Send + 'static + Fn(S) -> T,
    S: Send + 'static,
    T: Send + 'static,
{
    let result = tokio::task::spawn_blocking(move || func(item))
        .await
        .unwrap();
    if let Err(err) = tx.send(result).await {
        panic!("failed to send result to channel: {}", err);
    }
}