4

I have the following requirements -

  1. A thread which receives the messages and en-queue those.
  2. A thread which processes the enqueued messages.

Now, the second thread always has to be alive - for which I have used infinite while loop as follows:

private AutoResetEvent messageReset;
private Queue<byte[]> messageQueue;

//thread 2 method
private void ProcessIncomingMessages()
{
 messageReset.WaitOne(); //wait for signal
 while(true)
 {
    if (messageQueue.Count > 0)
    {
        //processing messages
    }
  }
}
public void SubmitMessageForProcessing(byte[] message){
            messageQueue.Enqueue(message); //enqueue message

            // Release the thread
            messageReset.Set();
}

Now, this infinite while loop is shooting the CPU utilization very high. Is there any workaround to lower down the CPU utilization

NOTE: I can't add any thread.sleep statement as the incoming messages are to be displayed on UI with minimum delay.

Hershika Sharma
  • 105
  • 1
  • 14
  • You might want to look into [System.Collections.Concurrent.BlockingCollection](https://msdn.microsoft.com/en-us/library/dd267312(v=vs.110).aspx), it's thread safe and blocks on `Take` – Freggar Jun 27 '18 at 06:03

2 Answers2

5

Just use a BlockingCollection instead of Queue. It is threadsafe and will block onTake until some worker adds an item:

// Use default constructor to make BlockingCollection FIFO
private BlockingCollection<byte[]> messageQueue = new BlockingCollection<byte[]>();

//thread 2 method
private void ProcessIncomingMessages()
{
    while (true)
    {
        //will block until thread1 Adds a message
        byte[] message = messageQueue.Take();

        //processing messages
    }
}
public void SubmitMessageForProcessing(byte[] message)
{
    messageQueue.Add(message); //enqueue message
}

EDIT2: I forgot to mention that by using the default constructor BlockingCollection will be FIFO. It will actually use a ConcurrentQueue as item container.

If you wanted BlockingCollection to behave like a LIFO collection you would need to pass a IProducerConsumerCollection that is LIFO to the constructor. The usual class for that would be ConcurrentStack


EDIT: Some explanation on how your Queue is not thread-safe and this could lead to problems with your current code.

From the Microsoft documentation on Queue:

A Queue can support multiple readers concurrently, as long as the collection is not modified.

This means you cannot read and write from multiple threads at the same time.

Look at the following example which also applies to the other answers which suggest just moving messageReset.WaitOne() in your while(true) block.

  1. SubmitMessageForProcessing is called and signals messageReset.Set()
  2. Thread 2 gets active and tries to read data.
  3. While thread 2 reads data SubmitMessageForProcessing is called a second time.
  4. Now you are writing and reading at the same time resulting in unexpected behavior (usually some kind of exception)
Freggar
  • 1,049
  • 1
  • 11
  • 24
  • Thank you.. Could you please elaborate on thread safety. How thread safety is better in BlockCollection than in Queue. – Hershika Sharma Jun 27 '18 at 06:34
  • @HershikaSharma The threadsafety section of the documentation for [BlockingCollection](https://msdn.microsoft.com/en-us/library/dd267312(v=vs.110).aspx#Anchor_10) and [Queue](https://msdn.microsoft.com/en-us/library/7977ey2c(v=vs.110).aspx#Anchor_10) will probably do a better explanation than I would. But if something specific is unclear don't worry to ask. – Freggar Jun 27 '18 at 06:39
  • Hi Freggar Does BlockingCollection follows FIFO? How will the following scenario be implemented - – Hershika Sharma Jun 27 '18 at 08:13
  • 1
    @HershikaSharma good question, I actually forgot to mention that! Yes, `BlockingCollection` is [FIFO](https://msdn.microsoft.com/en-us/library/dd287116(v=vs.110).aspx) when you use the default constructor. But it could be used as LIFO as well. – Freggar Jun 27 '18 at 08:18
  • Hi Freggar Does BlockingCollection follows FIFO? How will the following scenario be implemented - 'ProcessIncomingMessages' is processing the first message. And meanwhile, 'SubmitMessageForProcessing' added 2 new messages in the 'messageQueue'. When the processing of first message completes. What 'messageQueue.Take();' returns then. – Hershika Sharma Jun 27 '18 at 08:19
  • @HershikaSharma it works correctly, it will return one message after the other in the same order they came in. – Mong Zhu Jun 27 '18 at 08:21
  • But one drawback I see in BlockingCollection is we can't read the item without removing it. As we have Peek() in Queue. Please correct me if I am wrong. – Hershika Sharma Jun 27 '18 at 08:35
  • 1
    @HershikaSharma You are correct, `BlockingCollection` uses the `IProducerConsumerCollection` interface/pattern which by design does not allow peeking. If you need that functionality you sadly can't use `BlockingCollection`. If you really need this behavior I would adapt the BlockingQueue implementation from https://stackoverflow.com/a/530228/4862034 – Freggar Jun 27 '18 at 08:44
2

In your example, the while loop will busy-wait until the queue has at least one element. You can move the signal into that loop to reduce the busy-waiting and use less CPU.

private void ProcessIncomingMessages()
{
    while(true)
    {
        messageReset.WaitOne(100); //wait for signal
        while (messageQueue.Count > 0)
        {
            //processing messages
        }
    }
}

P.S. Unless you have some sort of custom locking mechanism, you must use a ConcurrentQueue<T> instead of a Queue<T> if you want to be thread-safe. Also, I put a timeout on the WaitOne call because there is a slim chance the signal will get set after you check Count but before the WaitOne call is reached. There may be other threading issues in your solution. If you're not confident about threading concerns, you might want to use a BlockingCollection, which takes care of a lot of the details for you.

John Wu
  • 50,556
  • 8
  • 44
  • 80