13

I would like a function to check a Queue for new additions continuously on one thread

Obviously there is the option of a continuous loop with sleeps, but I want something less wasteful.

I considered a wait handle of some type and then having the queue signal it, but I can't override Enqueue safely as it is not virtual.

Now I'm considering encapsulating a Queue<T> as my best option but I wanted to ask you fine folks if there were a better one!

The idea is: I want many threads to access a socket connection while guaranteeing they read only the response for their message, so I was going to have one thread dispatch and read responses and then execute a callback with the response data (in plain text)

Matt Ellen
  • 11,268
  • 4
  • 68
  • 90
Ben
  • 2,867
  • 2
  • 22
  • 32

1 Answers1

17

Try the blocking queue: Creating a blocking Queue<T> in .NET?

The basic idea is that when you call TryDequeue it will block until there is something in the queue. As you can see "beauty" of the blocking queue is that you don't have to poll/sleep or do anything crazy like that... it's the fundamental backbone for a Producer/Consumer pattern.

My version of the blocking queue is:

public class BlockingQueue<T> where T : class
{
    private bool closing;
    private readonly Queue<T> queue = new Queue<T>();

    public int Count
    {
        get
        {
            lock (queue)
            {
                return queue.Count;
            }
        }
    }

    public BlockingQueue()
    {
        lock (queue)
        {
            closing = false;
            Monitor.PulseAll(queue);
        }
    }

    public bool Enqueue(T item)
    {
        lock (queue)
        {
            if (closing || null == item)
            {
                return false;
            }

            queue.Enqueue(item);

            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }

            return true;
        }
    }


    public void Close()
    {
        lock (queue)
        {
            if (!closing)
            {
                closing = true;
                queue.Clear();
                Monitor.PulseAll(queue);
            }
        }
    }


    public bool TryDequeue(out T value, int timeout = Timeout.Infinite)
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                if (closing || (timeout < Timeout.Infinite) || !Monitor.Wait(queue, timeout))
                {
                    value = default(T);
                    return false;
                }
            }

            value = queue.Dequeue();
            return true;
        }
    }

    public void Clear()
    {
        lock (queue)
        {
            queue.Clear();
            Monitor.Pulse(queue);
        }
    }
}

Many thanks to Marc Gravell for this one!

Community
  • 1
  • 1
Kiril
  • 39,672
  • 31
  • 167
  • 226
  • 3
    This implementation seems wrong to me. If I call `TryDequeue` on an empty queue with a finite timeout `t` it will return `default(T)` directly, no? It should wait at least `t` before giving up. – qerub Feb 10 '14 at 13:22
  • In TryDequeue(out T value, int timeout): timeout is an int, which matches what Monitor.Wait() expects, but Monitor.Wait() will throw a System.ArgumentOutOfRangeException if a negative number is specified as timeout. The second part of the if, which checks for a negative timeout, just ensures that a default value is returned instead of provoking an exception. – Mogens Beltoft Aug 30 '17 at 13:47