12

What would be the correct usage of either, BlockingCollection or ConcurrentQueue so you can freely dequeue items without burning out half or more of your CPU using a thread ?

I was running some tests using 2 threads and unless I had a Thread.Sleep of at least 50~100ms it would always hit at least 50% of my CPU.

Here is a fictional example:

private void _DequeueItem()
{
    object o = null;
    while(socket.Connected)
    {
        while (!listOfQueueItems.IsEmpty)
        {
            if (listOfQueueItems.TryDequeue(out o))
            {
                // use the data
            }
        }
    }
}

With the above example I would have to set a thread.sleep so the cpu doesnt blow up.

Note: I have also tried it without the while for IsEmpty check, result was the same.

Prix
  • 19,417
  • 15
  • 73
  • 132
  • 1
    If you do nothing besides dequeueing then 50% CPU usage is pretty normal, in general with Threads it is more important to look at the individual cores to assess performance. – weismat Jul 01 '11 at 09:22
  • 1
    @weismat - that doesn't sound right, dequeuing is a tiny operation. The problem here is the thread should wait for a signal to wake up, the signal telling it there is something to dequeue, rather than use an infinitely tight loop to attempt a dequeue. That would take a TINY amount of CPU, nowhere near 50% – Kieren Johnstone Jul 01 '11 at 09:46
  • You are right, I missed this - was just looking at the bad idea to not look at invididual cores! – weismat Jul 01 '11 at 13:29

3 Answers3

23

It is not because of the BlockingCollection or ConcurrentQueue, but the while loop:

while(socket.Connected)
{
    while (!listOfQueueItems.IsEmpty)
    { /*code*/ }
}

Of course it will take the cpu down; because of if the queue is empty, then the while loop is just like:

while (true) ;

which in turn will eat the cpu resources.

This is not a good way of using ConcurrentQueue you should use AutoResetEvent with it so whenever item is added you will be notified. Example:

private ConcurrentQueue<Data> _queue = new ConcurrentQueue<Data>();
private AutoResetEvent _queueNotifier = new AutoResetEvent(false);

//at the producer:
_queue.Enqueue(new Data());
_queueNotifier.Set();

//at the consumer:
while (true)//or some condition
{
    _queueNotifier.WaitOne();//here we will block until receive signal notification.
    Data data;
    if (_queue.TryDequeue(out data))
    {
        //handle the data
    }
}

For a good usage of the BlockingCollection you should use the GetConsumingEnumerable() to wait for the items to be added, Like:

//declare the buffer
private BlockingCollection<Data> _buffer = new BlockingCollection<Data>(new ConcurrentQueue<Data>());

//at the producer method:
_messageBuffer.Add(new Data());

//at the consumer
foreach (Data data in _buffer.GetConsumingEnumerable())//it will block here automatically waiting from new items to be added and it will not take cpu down 
{
    //handle the data here.
}
Jalal Said
  • 15,906
  • 7
  • 45
  • 68
  • Aldeen Saa'd the event works really well thanks, what waitone does wait 1 ms ? – Prix Jul 01 '11 at 09:43
  • @Prix: I just notice your question now: "what waitone does wait ms ?", No; The `WaitOne` is a blocking until it _receives_ a signal. In the code: `_queueNotifier.Set();` is the signal "here indicates that new item is added" Read more on `AutoResetEvent` [here](http://msdn.microsoft.com/en-us/library/system.threading.autoresetevent.aspx). – Jalal Said Jul 01 '11 at 13:02
  • @Prix: It waits until the event is set. – ghimireniraj Jul 01 '11 at 13:03
  • I'm quite sure this is bloated and un-necessary (hear: too manual). There is a `Take` function in the BlockingCollection that serves this exact purpose without any need for manual signal/wait. – v.oddou Sep 29 '14 at 06:47
  • @v.oddou that is way I have give the second example using 'BlockingCollection', because by using 'GetConsumingEnumerable()' which is same like 'Take()' function, then no needs to any calls to signal/wait, however the first part of my answer is to administrate the correct usage of 'ConcurrentQueue' alone, without 'BlockingCollection' to serve the same purpose, then you need to have some sort of signalling. – Jalal Said Sep 30 '14 at 16:01
  • @JalalSaid Thanks for presenting your good solution But I want to be always running and listening .When _buffer.count <= 0 listening will stop and adding a new item will no longer call this event – Hadi Salehy Oct 08 '19 at 13:02
7

You really want to be using the BlockingCollection class in this case. It is designed to block until an item appears in the queue. A collection of this nature is often referred to as a blocking queue. This particular implementation is safe for multiple producers and multiple consumers. That is something that is surprisingly difficult to get right if you tried implementing it yourself. Here is what your code would look like if you used BlockingCollection.

private void _DequeueItem()
{
    while(socket.Connected)
    {
        object o = listOfQueueItems.Take();
        // use the data
    }
}

The Take method blocks automatically if the queue is empty. It blocks in a manner that puts the thread in the SleepWaitJoin state so that it will not consume CPU resources. The neat thing about BlockingCollection is that it also uses low-lock strategies to increase performance. What this means is that Take will check to see if there is an item in the queue and if not then it will briefly perform a spin wait to prevent a context switch of the thread. If the queue is still empty then it will put the thread to sleep. This means that BlockingCollection will have some of the performance benefits that ConcurrentQueue provides in regards to concurrent execution.

Brian Gideon
  • 47,849
  • 13
  • 107
  • 150
  • BlockingCollection uses ConcurrentQueue by default unless a IProducerConsumerCollection is passed to the constructor of BlockingCollection. so, by definition it should have all of the benefits of ConcurrentQueue (unless another collection is used). – Dave Black May 15 '15 at 19:50
0

You can call Thread.Sleep() only when queue is empty:

private void DequeueItem()
{
    object o = null;

    while(socket.Connected)
    {
        if (listOfQueueItems.IsEmpty)
        {
            Thread.Sleep(50);
        }
        else if (listOfQueueItems.TryDequeue(out o))
        {
            // use the data
        }
    }
}

Otherwise you should consider to use events.

treetey
  • 829
  • 1
  • 6
  • 16