I'm looking to implement the following:
- A work queue which I will pre-populate with some initial work items
- Once done, I need to launch several worker tasks (or threads), which will dequeue work from this queue
- While processing a work item, each worker is allowed to re-enqueue items into the queue
- All worker threads must remain running and attempt to dequeue new work items, until A) there is no work currently left to do (the queue is empty), and B) it is certain that no other worker will enqueue any further work (aka, is currently processing an item)
What is the simplest way to implement this, preferably using largely standard data structures in C#?
My initial approach was something like this:
var queue = new ConcurrentQueue<WorkItem>();
// ... populate queue with some initial items ...
await Task.WhenAll(Enumerable.Range(0, nWorkers).Select(_ => Task.Run(async () => {
while(queue.TryDequeue(out var item))
{
// Process stuff asynchronously, potentially re-queue work, etc.
}
}).ToArray());
But of course, this does not work - it's possible a worker may terminate because TryDequeue fails(), but another still active worker will re-queue work.
A channel seems quite suitable as well, workers could dequeue until the channel is closed. But the fundamental problem remains: How exactly can I determine the closing condition? Which worker closes the channel, and based on which condition?