2

I have multiple producing threads, and single consuming. In C#, I'm using ConcurrentQueue for that.

How can I properly put the consumer thread to sleep, when the queue is empty?

ManualResetEventSlim signal;
void WorkerThread(CancellationToken token)
{
    while(!token.IsCancellationRequested)
    {
        object work;
        if (!_eventQueue.TryDequeue(out work))
        {
            signal.Reset();
            signal.Wait(token);
            continue;
        }
        ...
    }
}
...
void Produce(object o)
{
    _eventQueue.Enqueue(o);
    signal.Set();
}

I tried this, but there is some chance, that

  1. thread B fails to read from _eventQueue
  2. thread A writes into _eventQueue
  3. thread A sets the signal
  4. thread B resets the signal
  5. thread B waits indefinitely

How to overcome this? Previously, I used lock() and Monitor.Wait(). AutoResetEvent may help (it resets on successful Wait), but it does not support CancellationToken.

nothrow
  • 15,882
  • 9
  • 57
  • 104
  • Why don't you use [`BlockingCollection`](https://msdn.microsoft.com/en-us/library/dd267312(v=vs.110).aspx)? – Yacoub Massad Dec 13 '15 at 20:21
  • @YacoubMassad, frankly, haven't heard of it before. Looks like it does exactly what I need, if you post this as a answer, I will accept it. Thanks. – nothrow Dec 13 '15 at 20:24
  • You can also check out http://stackoverflow.com/questions/7863573/awaitable-task-based-queue if you building server side code (like ASP.Net sites). – Alexei Levenkov Dec 13 '15 at 20:37

1 Answers1

4

You can use the BlockingCollection class to support the case of multiple producers and single consumer.

Create an object of type BlockingCollection like this:

BlockingCollection<object> collection = new BlockingCollection<object>(); //You can have a specific type instead of object if you want

Producers can simple call the Add method to add an item to the collection like this:

collection.Add("value");

And the consumer can use the GetConsumingEnumerable method to get an IEnumerable<T> that takes items from the collection. Such enumerable will block (wait for more items) when there are no more items. This method also supports cancellation.

foreach (var item in collection.GetConsumingEnumerable())
{
    //Consume item
}

If you call the CompleteAdding method, then the consuming enumerable will finish once there are no more items.

This class is completely thread-safe.

Yacoub Massad
  • 27,509
  • 2
  • 36
  • 62