20

Please see below pseudo code

//Single or multiple Producers produce using below method
    void Produce(object itemToQueue)
    {
        concurrentQueue.enqueue(itemToQueue);
        consumerSignal.set;
    }

    //somewhere else we have started a consumer like this
    //we have only one consumer
    void StartConsumer()
    {
        while (!concurrentQueue.IsEmpty())
        {
            if (concurrentQueue.TrydeQueue(out item))
            {
                //long running processing of item
            }
        }
        consumerSignal.WaitOne();
    }

How do I port this pattern I have used since time immemorial to use taskfactory created tasks and the new signalling features of net 4. In other words if someone were to write this pattern using net 4 how would it look like ? Pseudo code is fine. Iam already using .net 4 concurrentQueue as you can see. How do I use a task and possibly use some newer signalling mechanism if possible. thanks

Solution to my problem below thanks to Jon/Dan. Sweet. No manual signalling or while(true) or while(itemstoProcess) type loops like the old days

//Single or multiple Producers produce using below method
 void Produce(object itemToQueue)
 {
     blockingCollection.add(item);
 }

 //somewhere else we have started a consumer like this
 //this supports multiple consumers !
 task(StartConsuming()).Start; 

 void StartConsuming()
 {
     foreach (object item in blockingCollection.GetConsumingEnumerable())
     {
                //long running processing of item
     }
 }

cancellations are handled using cancel tokens
Gullu
  • 3,477
  • 7
  • 43
  • 70
  • A really good (because it is step by step) explanation and example [may be found here](http://www.codethinked.com/blockingcollection-and-iproducerconsumercollection). – MajesticRa Jan 03 '12 at 13:53
  • Hi Gullu, please take a look at [this code](https://stackoverflow.com/questions/733793/implementing-the-producer-consumer-pattern-in-c-sharp/47179576#47179576). It is a simple working example on how to use a BlockingCollection for Producer-Consumer patterns. – sɐunıɔןɐqɐp Nov 08 '17 at 13:26

3 Answers3

26

You would use BlockingCollection<T>. There's an example in the documentation.

That class is specifically designed to make this trivial.

Jon Skeet
  • 1,421,763
  • 867
  • 9,128
  • 9,194
  • @user666490: That's fine - you wrap that in a `BlockingCollection`. If you insist on doing things directly, you'll end up basically repeating a bunch of code in `BlockingCollection` - why reinvent the wheel? – Jon Skeet Jun 28 '11 at 19:43
  • 4
    @user666490: Jon has given you the canonical .Net 4 solution to the producer consumer problem. From MSDN: "Provides blocking and bounding capabilities for thread-safe collections that implement IProducerConsumerCollection." – user7116 Jun 28 '11 at 19:44
  • Jon, if you scroll to the bottom of the page on the msdn page you refer to above, you will see below code. This is so lame compared to the old fashioned signalling I have shown in my post above. Don't you agree ? // Consume bc while (true) Console.WriteLine(bc.Take()); } catch (InvalidOperationException) { // IOE means that Take() was called on a completed collection Console.WriteLine("That's All!"); } – Gullu Jun 28 '11 at 19:54
  • 1
    Blocking collection provides no signalling to the consumer unless the consumer is going to poll on blockingcollection.IsCompleted/IsAddingCompleted. In the classic pattern I have posted a producer adds to queue, signals the consumer and is done. With a blocking coll we can mark the coll as completedadding which then puts it in a state where more items cannot be added until the consumer deques all items. – Gullu Jun 28 '11 at 20:07
  • @user666490: You don't need to poll - you just use `Take` and it will block until there's an item ready. You can specify a bounded capacity if you want the producer to block until there's room for a new item in the queue. – Jon Skeet Jun 28 '11 at 20:11
  • Still don't understand how a consumer can be notified when items are added. My post has about 10 lines of pseudo code. Can you please port (pseudo code fine) and post here. That was my real question anyway. Bottom line is that I can get it to work using blocking collection, task factory etc but the classic pattern using signals as shown looks much cleaner. thanks – Gullu Jun 28 '11 at 20:22
  • @user666490, the Take call blocks until an item is available (it's a `blocking` collection), so the waiting on a signal happens as an implementation detail inside Take. – Dan Bryant Jun 28 '11 at 20:28
  • Jon/Dan Finally got it. I will update my post to show the new/old pattern side by side. Anyone who can improve that further please edit the post. thanks – Gullu Jun 28 '11 at 20:32
  • @JonSkeet - I'm trying to implement the simple BlockingCollection usage without a while(true) and TryTake() inside of a Task. I was going to use GetConsumingEnumerable() but this is killing me ... per https://msdn.microsoft.com/en-us/library/dd460684%28v=vs.110%29.aspx "...There is no guarantee that the items are enumerated in the same order in which they are added by the producer threads." What's the point of having a ConcurrentQUEUE if I can't enumerate and guarantee order? I need to have the items processed sequentially. Am I missing something? – Dave Black May 15 '15 at 20:33
  • @Dave: I suspect you're missing that ConcurrentQueue is not the only concurrent collection. I suspect with ConcurrentQueue you're fine. – Jon Skeet May 15 '15 at 20:35
  • @Jon skeet I know there are other concurrent collections. I would've expected the MSDN documentation to have made that caveat regarding the queue. – Dave Black May 16 '15 at 22:39
11

Your second block of code looks better. But, starting a Task and then immediately waiting on it is pointless. Just call Take and then process the item that is returned directly on the consuming thread. That is how the producer-consumer pattern is meant to be done. If you think the processing of work items is intensive enough to warrant more consumers then by all means start more consumers. BlockingCollection is safe multiple producers and multiple consumers.

public class YourCode
{
  private BlockingCollection<object> queue = new BlockingCollection<object>();

  public YourCode()
  {
    var thread = new Thread(StartConsuming);
    thread.IsBackground = true;
    thread.Start();
  }

  public void Produce(object item)
  {
    queue.Add(item);
  }

  private void StartConsuming()
  {
    while (true)
    {
      object item = queue.Take();
      // Add your code to process the item here.
      // Do not start another task or thread. 
    }
  }
}
Brian Gideon
  • 47,849
  • 13
  • 107
  • 150
1

I've used a pattern before that creates a sort of 'on-demand' queue consumer (based on consuming from a ConcurrentQueue):

        private void FireAndForget(Action fire)
        {
            _firedEvents.Enqueue(fire);
            lock (_taskLock)
            {
                if (_launcherTask == null)
                {
                    _launcherTask = new Task(LaunchEvents);
                    _launcherTask.ContinueWith(EventsComplete);
                    _launcherTask.Start();
                }
            }
        }

        private void LaunchEvents()
        {
            Action nextEvent;

            while (_firedEvents.TryDequeue(out nextEvent))
            {
                if (_synchronized)
                {
                    var syncEvent = nextEvent;
                    _mediator._syncContext.Send(state => syncEvent(), null);
                }
                else
                {
                    nextEvent();                        
                }

                lock (_taskLock)
                {
                    if (_firedEvents.Count == 0)
                    {
                        _launcherTask = null;
                        break;
                    }
                }
            }
        }

        private void EventsComplete(Task task)
        {
            if (task.IsFaulted && task.Exception != null)
            {
                 // Do something with task Exception here
            }
        }
Dan Bryant
  • 27,329
  • 4
  • 56
  • 102
  • thx. But Iam hoping net 4 new tpl pfx should make the classic pattern I have posted more easier to maintain/understand. Your approach is an overkill for me. – Gullu Jun 28 '11 at 19:48
  • 1
    If your primary goal is ease of use, Jon's answer is best; the BlockingCollection is designed specifically for this very common pattern (with a simple blocking call to Take and built-in support for the new Cancellation system.) – Dan Bryant Jun 28 '11 at 20:23