15

I have to develop a multithreaded application, where there will be multiple threads, each thread generates custom event log which need to be saved in queue (not Microsoft MSMQ).

There will be another thread which reads log data from queue and manipulates it, with certain information to save log information into a file. Basically here we are implementing Multiple-producer, Single-consumer paradigm.

Can anybody provide suggestions on how to implement this in C++ or C#.

Thanks,

at0S
  • 4,522
  • 2
  • 14
  • 10
Vikram Ranabhatt
  • 7,268
  • 15
  • 70
  • 133

3 Answers3

17

This kind of thing is very easy to do using the BlockingCollection<T> defined in System.Collections.Concurrent.

Basically, you create your queue so that all threads can access it:

BlockingCollection<LogRecord> LogQueue = new BlockingCollection<LogRecord>();

Each producer adds items to the queue:

while (!Shutdown)
{
    LogRecord rec = CreateLogRecord(); // however that's done
    LogQueue.Add(rec);
}

And the consumer does something similar:

while (!Shutdown)
{
    LogRecord rec = LogQueue.Take();
    // process the record
}

By default, BlockingCollection uses a ConcurrentQueue<T> as the backing store. The ConcurrentQueue takes care of thread synchronization and, and the BlockingCollection does a non-busy wait when trying to take an item. That is, if the consumer calls Take when there are no items in the queue, it does a non-busy wait (no sleeping/spinning) until an item is available.

Andrew Morton
  • 24,203
  • 9
  • 60
  • 84
Jim Mischel
  • 131,090
  • 20
  • 188
  • 351
  • Maybe it would be useful for the Onsumer to get the additional condition `while (!Shutdown || LogQueue.())` (don't know how it is expressed in C#). Or, if the LogQueue has a way to be stopped, only use this. – glglgl Sep 01 '11 at 05:41
  • 2
    You could have the producer call `CompleteAdding`, which will mark the collection complete for adding (i.e. no more items can be added). The consumer could then use `while (LogQueue.TryTake(out rec, Timeout.Infinite))', which means it would empty the collection and then exit. `TryTake` will return `False` if the collection is complete for adding and the queue is empty. – Jim Mischel Sep 01 '11 at 20:00
  • @JimMischel After the consumer dequeues an item, perform some processing, how can you return a result to the calling process? – gdp Mar 26 '13 at 17:40
  • @gdp: There are many ways to do a notification when a thread is done performing some work. How you do it depends on what you mean by "the calling process." If you have a specific question, I suggest you post a question. – Jim Mischel Mar 26 '13 at 18:34
  • @JimMischel I have actually posted a question, no responses as yet. Thanks for your response anyway. – gdp Mar 26 '13 at 18:39
  • So if I have _shutdown_, what would happen to the consumer while loop? Will it hang? – deostroll Feb 10 '16 at 14:43
  • @deostroll: If the `Shutdown` flag is set, the consumer loop exits. – Jim Mischel Feb 15 '16 at 16:33
  • @JimMischel Yes. You are correct. In addition to that, you've have to call `CompleteAdding()` somewhere, and also surround the `Take()` with a try-catch construct that handles an IOE, and do nothing but return when it happens... – deostroll Feb 16 '16 at 06:45
  • Internally BlockingCollection uses `SemaphoreSlim` and `ConcurrentQueue` to achieve a "non-busy" wait. – joe Nov 30 '20 at 05:37
  • nice approach! how would you actually spawn multiple producers? Is multiple Task.Run() still a reasonable choice? – yBother Jan 20 '23 at 14:06
  • 1
    @yBother Yes, you could use multiple `Task.Run()` calls. Or anything else that allows you to spin up a thread. What you choose will depend on what you've used in the rest of your program. – Jim Mischel Feb 02 '23 at 21:44
2

You can use a synchronized queue (if you have .NET 3.5 or older code) or even better the new ConcurrentQueue<T>!

Daniel A. White
  • 187,200
  • 47
  • 362
  • 445
2

What you are planning is a classic producer consumer queue with a thread consuming the items on the queue to do some work. This can be wrapped into is a higher level construct called an "actor" or "active object".

Basically this wraps the queue and the thread that consumes the items into a single class, the other threads all asynchronous methods on this class with put the messages on the queue to be performed by the actor's thread. In your case the class could have a single method writeData which stores the data in the queue and triggers the condition variable to notify the actor thread that there is something in the queue. The actor thread sees if there is any data in the queue if not waits on the condition variable.

Here is a good article on the concept:

http://www.drdobbs.com/go-parallel/article/showArticle.jhtml;jsessionid=UTEXJOTLP0YDNQE1GHPSKH4ATMY32JVN?articleID=225700095

iain
  • 10,798
  • 3
  • 37
  • 41