169

I have a scenario where I have multiple threads adding to a queue and multiple threads reading from the same queue. If the queue reaches a specific size all threads that are filling the queue will be blocked on add until an item is removed from the queue.

The solution below is what I am using right now and my question is: How can this be improved? Is there an object that already enables this behavior in the BCL that I should be using?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}
Eric Schoonover
  • 47,184
  • 49
  • 157
  • 202
  • 8
    .Net how has built-in classes to help with this scenario. Most of the answers listed here are obsolete. See the most recent answers at the bottom. Look into thread-safe blocking collections. The answers may be obsolete, but it's still a good question! – Tom A Mar 14 '13 at 00:29
  • I think it's still a good idea to learn about Monitor.Wait/Pulse/PulseAll even if we have new concurrent classes in .NET. – thewpfguy May 14 '14 at 16:21
  • 1
    Agree with @thewpfguy. You'll want to comprehend the basic locking mechanisms behind the scenes. Also worth noting that Systems.Collections.Concurrent didn't exist until April 2010 and then only in Visual Studio 2010 and above. Definitively not an option for the VS2008 hold outs... – Vic Colborn Mar 03 '15 at 20:05
  • 1
    If you're reading this now, take a look at System.Threading.Channels for a multi-writer/multi-reader, bounded, optionally-blocking implementation of this for .NET Core and .NET Standard. – Mark Rendle Apr 24 '19 at 11:14

10 Answers10

207

That looks very unsafe (very little synchronization); how about something like:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(edit)

In reality, you'd want a way to close the queue so that readers start exiting cleanly - perhaps something like a bool flag - if set, an empty queue just returns (rather than blocking):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}
Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • Wouldn't you also need to Monitor.PulseAll in case of close? – Eric Schoonover Feb 09 '09 at 23:15
  • 1
    How about changing the wait to a WaitAny and passing in a terminate waithandle on construction ... – Sam Saffron May 14 '09 at 04:22
  • @sambo99 - for what purpose? At the moment it uses Monitor, which is the cheapest of all the locking constructs, as it doesn't need to go to the OS. A bool and a Monitor is quite a bit cheaper than a waithandle... – Marc Gravell May 14 '09 at 04:28
  • Yerp, your implementation here is solid, easy to understand and fast. Only advantage with a WaitAny type approach is that you could have a single event that triggers shutdown without needing a Close method ... It may be useful sometimes. – Sam Saffron May 16 '09 at 01:22
  • another thing I would look at here is probably making size queue IDisposable. Also, what happens if your queue object goes out of scope while threads are blocked. It would probably be a bugger to debug. – Sam Saffron May 16 '09 at 01:25
  • 1
    @Marc- an optimisation, if you were expecting the queue to always reach capacity, would be to pass the maxSize value into the constructor of the Queue. You could add another constructor to your class to accomodate for that. – RichardOD Jan 22 '10 at 16:26
  • 3
    Why SizeQueue, why not FixedSizeQueue? – mindless.panda Mar 15 '10 at 20:26
  • If you lock on the queue, won't that effectively prevent other threads from messing with it? For instance, add, first it locks the queue, and then if the queue is full, it then waits for the queue to be signalled, but how can it be signalled if no other thread can lock it first? There must be something I'm not getting here... – Lasse V. Karlsen May 07 '10 at 14:17
  • 4
    @Lasse - it releases the lock(s) during `Wait`, so other threads can acquire it. It reclaims the lock(s) when it wakes up. – Marc Gravell May 07 '10 at 18:43
  • 1
    Nice, as I said, there was something I wasn't getting :) That sure makes me want to revisit some of my thread-code.... – Lasse V. Karlsen May 07 '10 at 18:52
  • What happens when a thread wakes up and the lock is already taken by someone else, it will block waiting for the lock to become free and then claim it? – Lasse V. Karlsen May 07 '10 at 18:54
  • Actually I can think of many questions around this subject, is there a link to some material I can read? If not, I'll try to summarize up some questions in a post of my own here. – Lasse V. Karlsen May 07 '10 at 18:56
  • @Lasse - when it is pulsed it moves from the sleeping-queue to the ready-queue (as though it was just now trying to take the lock). When the thread that pulsed releases the lock, the *next* thread in the ready-queue (which might be neither of those) gets a go. And so on. MSDN may help, but... Jon's pages have some coverage too. – Marc Gravell May 07 '10 at 21:38
  • 1
    @Lasse - note the drop down at the bottom for the 17 topics: http://www.yoda.arachsys.com/csharp/threads/ – Marc Gravell May 07 '10 at 21:39
  • 1
    Would not Pulse instead of PulseAll work fine in Enqueue/Dequeue? – notso May 11 '10 at 15:11
  • 1
    @notso - possibly, but then you largely have to *know* which thread is waiting. And in the case of pulling the queue down you actively *want* to wake up all the worker. In most cases there is only one worker, making it a moot point. In the more complex cases, you've got enough to worry about, without stressing over this - you could *carefully* add it, I suppose. – Marc Gravell May 11 '10 at 15:31
  • @notso In answer to my own question: no, Pulse is not enough since it wakes up a single thread and we do not know whether this thread is in the Enqueue or Dequeue method. – notso May 11 '10 at 15:31
  • In case anyone cares I chose to not have a maximum queue size. Thus the enqueue call never blocks meaning dequeue does not need to call pulse at all and enqueue can use Pulse instead of PulseAll since only dequeue calls are waiting. Note that when closing the queue PulseAll is of course necessary. Also thank you Marc for the quick answer. – notso May 11 '10 at 15:39
  • I know this is an old post, but I have a small question. When `Enqueue` calls `PulseAll` it releases the monitor to every thread waiting and they race to lock the monitor object again, is it at all possible that if another writer will win and say the queue is full at this point, it'll enter `Monitor.Wait` again and then we end up with a deadlock? – Stan R. Jun 15 '13 at 22:06
  • 1
    @Stan that isn't a deadlock; a Wait releases the lock to the next in the list, so at some point a reader will get it, and will reawaken that writer when it has made space – Marc Gravell Jun 15 '13 at 22:41
  • @MarcGravell ahh, I keep forgetting that the `.Wait` actually releases the lock!! Makes perfect sense now. Thx. – Stan R. Jun 15 '13 at 22:50
  • `bool TryEnqueue(T item)` is also needed to cleanly exit the threads blocked once `queue.Count` reaches `maxSize`. The items in `queue` will _not_ be consumed but can do nothing much about that. – hIpPy May 08 '15 at 18:11
  • @hlpPy it certainly *could* be required, if you need a method that appends if there is room, else doesn't append and returns immediately. I didn't need that :) – Marc Gravell May 08 '15 at 18:50
  • @MarcGravell : Based on your class I had to implement a TryDequeueWhere, to dequeue an object based on certain criteria. Could you have a look at this : https://stackoverflow.com/q/47174748/1959238 – Franki1986 Nov 08 '17 at 10:11
62

Use .net 4 BlockingCollection, to enqueue use Add(), to dequeue use Take(). It internally uses non-blocking ConcurrentQueue. More info here Fast and Best Producer/consumer queue technique BlockingCollection vs concurrent Queue

Community
  • 1
  • 1
xhafan
  • 2,140
  • 1
  • 26
  • 26
14

You can use the BlockingCollection and ConcurrentQueue in the System.Collections.Concurrent Namespace

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}
Andreas
  • 3,843
  • 3
  • 40
  • 53
14

"How can this be improved?"

Well, you need to look at every method in your class and consider what would happen if another thread was simultaneously calling that method or any other method. For example, you put a lock in the Remove method, but not in the Add method. What happens if one thread Adds at the same time as another thread Removes? Bad things.

Also consider that a method can return a second object that provides access to the first object's internal data - for example, GetEnumerator. Imagine one thread is going through that enumerator, another thread is modifying the list at the same time. Not good.

A good rule of thumb is to make this simpler to get right by cutting down the number of methods in the class to the absolute minimum.

In particular, don't inherit another container class, because you will expose all of that class's methods, providing a way for the caller to corrupt the internal data, or to see partially complete changes to the data (just as bad, because the data appears corrupted at that moment). Hide all the details and be completely ruthless about how you allow access to them.

I'd strongly advise you to use off-the-shelf solutions - get a book about threading or use 3rd party library. Otherwise, given what you're attempting, you're going to be debugging your code for a long time.

Also, wouldn't it make more sense for Remove to return an item (say, the one that was added first, as it's a queue), rather than the caller choosing a specific item? And when the queue is empty, perhaps Remove should also block.

Update: Marc's answer actually implements all these suggestions! :) But I'll leave this here as it may be helpful to understand why his version is such an improvement.

Daniel Earwicker
  • 114,894
  • 38
  • 205
  • 284
6

I just knocked this up using the Reactive Extensions and remembered this question:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

Not necessarily entirely safe, but very simple.

Mark Rendle
  • 9,274
  • 1
  • 32
  • 58
5

This is what I came op for a thread safe bounded blocking queue.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}
Eric Schoonover
  • 47,184
  • 49
  • 157
  • 202
Kevin
  • 619
  • 1
  • 6
  • 15
  • Could you provide some code samples of how I'd queue up some thread functions using this library, including how I'd instantiate this class? – theJerm Nov 10 '13 at 19:39
  • This question/response is a bit dated. You should look at the System.Collections.Concurrent namespace for blocking queue support. – Kevin Nov 16 '13 at 18:35
2

I haven't fully explored the TPL but they might have something that fits your needs, or at the very least, some Reflector fodder to snag some inspiration from.

Hope that helps.

TheMissingLINQ
  • 407
  • 3
  • 8
  • I am aware that this is old, but my comment is for newcomers to SO, as OP already knows this today. This is not an answer, this should have been a comment. – John Demetriou Jul 01 '20 at 12:02
2

Starting with .NET 5.0/Core 3.0 you can use System.Threading.Channels
Benchmarks from this (Asynchronous Producer Consumer Pattern in .NET (C#)) article show a significant speed boost over BlockingCollection!

5andr0
  • 1,578
  • 1
  • 18
  • 25
0

Well, you might look at System.Threading.Semaphore class. Other than that - no, you have to make this yourself. AFAIK there is no such built-in collection.

Vilx-
  • 104,512
  • 87
  • 279
  • 422
  • I looked at that for throttling the number of threads that are accessing a resource but it doesn't allow you to block all access to a resource based on some condition (like Collection.Count). AFAIK anyway – Eric Schoonover Feb 09 '09 at 22:07
  • Well, you do that part yoursef, just like you do now. Simply instead of MaxSize and _FullEvent you have the Semaphore, which you initialize with the right count in the constructor. Then, upon every Add/Remove you call WaitForOne() or Release(). – Vilx- Feb 09 '09 at 22:09
  • It's not much different than what you have now. Just more simple IMHO. – Vilx- Feb 09 '09 at 22:09
  • Can you give me an example showing this working? I didn't see how to adjust the size of a Semaphor dynamically which this scenario requires. Since you have to be able to block all resources only if the queue is full. – Eric Schoonover Feb 09 '09 at 22:22
  • Ahh, changing size! Why didn't you say so immediately? OK, then a semaphore is not for you. Good luck with this approach! – Vilx- Feb 09 '09 at 22:40
  • I am aware that this is old, but my comment is for newcomers to SO, as OP already knows this today. This is not an answer, this should have been a comment. – John Demetriou Jul 01 '20 at 12:02
-1

If you want maximum throughput, allowing multiple readers to read and only one writer to write, BCL has something called ReaderWriterLockSlim that should help slim down your code...

DavidN
  • 5,117
  • 2
  • 20
  • 15
  • I want none to be able to write if the queue is full though. – Eric Schoonover Feb 09 '09 at 22:05
  • So you combine it with a lock. Here are some very good examples http://www.albahari.com/threading/part2.aspx#_ProducerConsumerQWaitHandle http://www.albahari.com/threading/part4.aspx – DavidN Feb 09 '09 at 22:10
  • 3
    With queue/dequeue, everyone is a writer... an exclusive lock will perhaps be more pragmatic – Marc Gravell Feb 09 '09 at 22:25
  • I am aware that this is old, but my comment is for newcomers to SO, as OP already knows this today. This is not an answer, this should have been a comment. – John Demetriou Jul 01 '20 at 12:02