diff options
author | Lars Wirzenius <liw@liw.fi> | 2021-07-05 19:36:51 +0300 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2021-07-18 10:02:33 +0300 |
commit | b2a3a0999d6085d20419d9794b89ab9b0ada0124 (patch) | |
tree | 8bcea6bed936105b14da7ee8aca0f19949f73992 /src/workqueue.rs | |
parent | b7fd8b09a1cef3b2044b8d459d9c58a79fc12830 (diff) | |
download | obnam2-b2a3a0999d6085d20419d9794b89ab9b0ada0124.tar.gz |
feat: add Engine and WorkQueue abstractions for async processing
Many thanks to Daniel Silverstone for helping me get through this.
Sponsored-by: author
Diffstat (limited to 'src/workqueue.rs')
-rw-r--r-- | src/workqueue.rs | 60 |
1 files changed, 60 insertions, 0 deletions
diff --git a/src/workqueue.rs b/src/workqueue.rs new file mode 100644 index 0000000..44ba5e4 --- /dev/null +++ b/src/workqueue.rs @@ -0,0 +1,60 @@ +use tokio::sync::mpsc; + +/// A queue of work items. +/// +/// An abstraction for producing items of work. For example, chunks of +/// data in a file. The work items are put into an ordered queue to be +/// worked on by another task. The queue is limited in size so that it +/// doesn't grow impossibly large. This acts as a load-limiting +/// synchronizing mechanism. +/// +/// One async task produces work items and puts them into the queue, +/// another consumes them from the queue. If the producer is too fast, +/// the queue fills up, and the producer blocks when putting an item +/// into the queue. If the queue is empty, the consumer blocks until +/// there is something added to the queue. +/// +/// The work items need to be abstracted as a type, and that type is +/// given as a type parameter. +pub struct WorkQueue<T> { + rx: mpsc::Receiver<T>, + tx: Option<mpsc::Sender<T>>, + size: usize, +} + +impl<T> WorkQueue<T> { + /// Create a new work queue of a given maximum size. + pub fn new(queue_size: usize) -> Self { + let (tx, rx) = mpsc::channel(queue_size); + Self { + rx, + tx: Some(tx), + size: queue_size, + } + } + + /// Get maximum size of queue. + pub fn size(&self) -> usize { + self.size + } + + /// Add an item of work to the queue. + pub fn push(&self) -> mpsc::Sender<T> { + self.tx.as_ref().unwrap().clone() + } + + /// Signal that no more work items will be added to the queue. + /// + /// You **must** call this, as otherwise the `next` function will + /// wait indefinitely. + pub fn close(&mut self) { + // println!("Chunkify::close closing sender"); + self.tx = None; + } + + /// Get the oldest work item from the queue, if any. + pub async fn next(&mut self) -> Option<T> { + // println!("next called"); + self.rx.recv().await + } +} |