4

I'm looking for the best scenario to implement one producer multiple consumer multithreaded application. Currently I'm using one queue for shared buffer but it's much slower than the case of one producer one consumer. I'm planning to do it like this:

Queue<item>[] buffs = new Queue<item>[N];
object[] _locks = new object[N];
static void Produce()
{
    int curIndex = 0;
    while(true)
    {
        // Produce item;
        lock(_locks[curIndex])
        {
            buffs[curIndex].Enqueue(curItem);
            Monitor.Pulse(_locks[curIndex]);
        }
        curIndex = (curIndex+1)%N;
    }
}

static void Consume(int myIndex)
{
    item curItem;
    while(true)
    {
        lock(_locks[myIndex])
        {
            while(buffs[myIndex].Count == 0)
                Monitor.Wait(_locks[myIndex]);
            curItem = buffs[myIndex].Dequeue();
        }
        // Consume item;
    }
}

static void main()
{
    int N = 100;
    Thread[] consumers = new Thread[N];
    for(int i = 0; i < N; i++)
    {
        consumers[i] = new Thread(Consume);
        consumers[i].Start(i);
    }
    Thread producer = new Thread(Produce);
    producer.Start();
}
Mehraban
  • 3,164
  • 4
  • 37
  • 60
  • By "fast producer", will it produce work continuously, or can it be paused if the queue is getting too big? Have you made sure you have enough of the slow consumers to avoid convoying? – Lasse V. Karlsen Oct 02 '13 at 07:27
  • Data is produced burstly, I tried even with just one consumer but I want performance to be much better. – Mehraban Oct 02 '13 at 07:28
  • @HansPassant Why do you think this buffer overflows?! It's an array of queues. – Mehraban Oct 02 '13 at 12:26

3 Answers3

6

Use a BlockingCollection

BlockingCollection<item> _buffer = new BlockingCollection<item>();

static void Produce()
{
    while(true)
    {
        // Produce item;
        _buffer.Add(curItem);
    }

    // eventually stop producing
    _buffer.CompleteAdding();
}

static void Consume(int myIndex)
{
    foreach (var curItem in _buffer.GetConsumingEnumerable())
    {
        // Consume item;
    }
}

static void main()
{
    int N = 100;
    Thread[] consumers = new Thread[N];
    for(int i = 0; i < N; i++)
    {
        consumers[i] = new Thread(Consume);
        consumers[i].Start(i);
    }
    Thread producer = new Thread(Produce);
    producer.Start();
}

If you don't want to specify number of threads from start you can use Parallel.ForEach instead.

static void Consume(item curItem)
{
    // consume item
}

void Main()
{
    Thread producer = new Thread(Produce);
    producer.Start();

    Parallel.ForEach(_buffer.GetConsumingPartitioner(), Consumer)
}
Jim Mischel
  • 131,090
  • 20
  • 188
  • 351
adrianm
  • 14,468
  • 5
  • 55
  • 102
  • +1. I removed an extraneous `[]`. Yes, definitely use `BlockingCollection`. Shouldn't be any need for multiple queues. – Jim Mischel Oct 02 '13 at 14:28
  • That's wonderful! There's no `lock` or `Monitor` at all. In latter case, how many threads will be generated? Are those threads will be created and removed at the end or something like a threadpool will be created to handle them? – Mehraban Oct 03 '13 at 06:52
  • @SAM, don't know, don't care, let the framework do its job :-). It uses the thread pool and max number of threads is equal to number of CPU cores or something. There are some options in Parallel.ForEach but only use them if you find a problem – adrianm Oct 03 '13 at 07:39
1

Using more threads won't help. It may even reduce performance. I suggest you try to use ThreadPool where every work item is one item created by the producer. However, that doesn't guarantee the produced items to be consumed in the order they were produced.


Another way could be to reduce the number of consumers to 4, for example and modify the way they work as follows:

The producer adds the new work to the queue. There's only one global queue for all worker threads. It then sets a flag to indicate there is new work like this:

ManualResetEvent workPresent = new ManualResetEvent(false);
Queue<item> workQueue = new Queue<item>();

static void Produce()
{
    while(true)
    {
        // Produce item;
        lock(workQueue)
        {
            workQueue.Enqueue(newItem);
            workPresent.Set();
        }
    }
}

The consumers wait for work to be added to the queue. Only one consumer will get to do its job. It then takes all the work from the queue and resets the flag. The producer will not be able to add new work until that is done.

static void Consume()
{
    while(true)
    {
        if (WaitHandle.WaitOne(workPresent))
        {
            workPresent.Reset();

            Queue<item> localWorkQueue = new Queue<item>();
            lock(workQueue)
            {
                while (workQueue.Count > 0)
                    localWorkQueue.Enqueue(workQueue.Dequeue());
            }

            // Handle items in local work queue
            ...
        }
    }
}    

That outcome of this, however, is a bit unpredictable. It could be that one thread is doing all the work and the others do nothing.

Thorsten Dittmar
  • 55,956
  • 8
  • 91
  • 139
  • What's the difference then? Consuming order isn't a problem. – Mehraban Oct 02 '13 at 07:51
  • See the accepted answer to this question: http://stackoverflow.com/questions/230003/thread-vs-threadpool. In short: Re-Use of thread (better performance) and sort of "load balancing". Why not give it a try and see if it works for you? – Thorsten Dittmar Oct 02 '13 at 08:43
  • I don't see much difference between these two approaches. Maybe using a thread pool just makes handling these threads easier. – Mehraban Oct 02 '13 at 09:38
  • You don't see a difference because you tried, or you don't think it's worth trying because you think there's no difference? As I said: What keeps you from trying, especially since the consuming order is irrelevant? – Thorsten Dittmar Oct 02 '13 at 10:05
  • What keeps me from trying is that the actual code is not as simple as this example is and besides these consumer threads do something other than just consuming produced data that is independent of data existence and so if I use threadpool, a suspended thread cannot do this job. But I will give it a try anyway, as soon as I can handle approach given in example. – Mehraban Oct 02 '13 at 10:35
  • All I can tell you is that using more threads can hamper performance. You could - for starters - try to create just 4 consumer threads and do as I will write in an edit to my question. – Thorsten Dittmar Oct 02 '13 at 10:40
  • Oh, and one problem in your above sample code could be that the consumer locks its lock and if there is no work waits for work to come in. This can never work, as the producer also tries to lock the queue. This should result in a deadlock. – Thorsten Dittmar Oct 02 '13 at 10:55
  • I have tried your edit code before and as I said in my question it's performance was even worse than the case with one consumer thread. Besides I think using a `ManualResetEvent` maybe a bad idea. What if the producer produces e.g. 5 items before all threads come back and wait for the event? – Mehraban Oct 02 '13 at 10:55
  • Then the handle will be set and the first thread that waits for it gets notified. The others will wait until it is set again. – Thorsten Dittmar Oct 02 '13 at 10:57
  • I asked about deadlock [here](http://stackoverflow.com/questions/19114831/does-any-deadlock-occur-in-this-code) and it won't occur. – Mehraban Oct 02 '13 at 10:59
  • 1
    Sorry, as I rarely use `Monitor` at all I failed to notice that `Monitor.Wait` actually releases the lock. – Thorsten Dittmar Oct 02 '13 at 11:39
0

I don't see why you have to use multiple queues. Just reduce the amount of locking. Here is an sample where you can have a large number of consumers and they all wait for new work.

public class MyWorkGenerator
{
    ConcurrentQueue<object> _queuedItems = new ConcurrentQueue<object>();
    private object _lock = new object();

    public void Produce()
    {
        while (true)
        {
            _queuedItems.Enqueue(new object());
            Monitor.Pulse(_lock);
        }
    }

    public object Consume(TimeSpan maxWaitTime)
    {
        if (!Monitor.Wait(_lock, maxWaitTime))
            return null;

        object workItem;
        if (_queuedItems.TryDequeue(out workItem))
        {
            return workItem;
        }

        return null;
    }

}

Do note that Pulse() will only trigger one consumer at a time.

Example usage:

    static void main()
    {
        var generator = new MyWorkGenerator();

        var consumers = new Thread[20];
        for (int i = 0; i < consumers.Length; i++)
        {
            consumers[i] = new Thread(DoWork);
            consumers[i].Start(generator);
        }

        generator.Produce();
    }

    public static void DoWork(object state)
    {
        var generator = (MyWorkGenerator) state;

        var workItem = generator.Consume(TimeSpan.FromHours(1));
        while (workItem != null)
        {
            // do work


            workItem = generator.Consume(TimeSpan.FromHours(1));
        }
    }

Note that the actual queue is hidden in the producer as it's imho an implementation detail. The consumers doesn't really have to know how the work items are generated.

jgauffin
  • 99,844
  • 45
  • 235
  • 372
  • If you use a `BlockingCollection` to wrap the queue it will reduce the complexity of the code as you won't need to manually wait when there are no items in the queue. – Servy Oct 02 '13 at 14:19
  • 1
    I'm sure this is just an oversight, but you're missing the `Monitor.Enter` (or `lock`) call that is required before calling `Pulse` or `Wait`. – Brian Gideon Oct 02 '13 at 14:54
  • So if this fast producer produces more than *20 items* (in this case) while no consumer has been finished it's work, what will happen? I think in time, it makes the queue larger and larger and larger. – Mehraban Oct 02 '13 at 15:03