55

I'm wondering if there exists an implementation/wrapper for ConcurrentQueue, similar to BlockingCollection where taking from the collection does not block, but is instead asynchronous and will cause an async await until an item is placed in the queue.

I've come up with my own implementation, but it does not seem to be performing as expected. I'm wondering if I'm reinventing something that already exists.

Here's my implementation:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
spender
  • 117,338
  • 33
  • 229
  • 351
  • According to our [on-topic](https://stackoverflow.com/help/on-topic) guidance, "**Some questions are still off-topic, even if they fit into one of the categories listed above:**...Questions asking us to *recommend or find a book, tool, software library, tutorial or other off-site resource* are off-topic..." – Robert Columbia Jun 29 '18 at 10:58
  • An await-able queue is what I thought of recently too (here is my question: https://stackoverflow.com/questions/52775484/how-to-load-balance-the-workload-of-a-service-in-net)! It would solve SO MANY issues in a microservices architecture, I believe! But in that case, the queue should probably be a persistent queue and not something in-memory. – user2173353 Oct 15 '18 at 07:55
  • Related: [Is there anything like asynchronous BlockingCollection?](https://stackoverflow.com/questions/21225361/is-there-anything-like-asynchronous-blockingcollectiont) – Theodor Zoulias Jun 25 '19 at 23:31

10 Answers10

70

I don't know of a lock-free solution, but you can take a look at the new Dataflow library, part of the Async CTP. A simple BufferBlock<T> should suffice, e.g.:

BufferBlock<int> buffer = new BufferBlock<int>();

Production and consumption are most easily done via extension methods on the dataflow block types.

Production is as simple as:

buffer.Post(13);

and consumption is async-ready:

int item = await buffer.ReceiveAsync();

I do recommend you use Dataflow if possible; making such a buffer both efficient and correct is more difficult than it first appears.

Serj-Tm
  • 16,581
  • 4
  • 54
  • 61
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • This looks very promising... will check it out tomorrow. Thanks. It's looks very much like a CCR port. – spender Oct 23 '11 at 19:47
  • 3
    Took a peek before bedtime instead! It looks like Dataflow fits my needs very nicely. It seems to bridge the gap between what's offered by TPL and what's offered in CCR (which I have used to great success). It leaves me feeling positive that the excellent work in CCR hasn't been squandered. This is the right answer (and something shiny and new to sink my teeth into!) Thanks @StephenCleary. – spender Oct 23 '11 at 22:39
  • Stephen Cleary's own Nito.AsyncEx library also has [AsyncProducerConsumerQueue](https://github.com/StephenCleary/AsyncEx/blob/master/src/Nito.AsyncEx.Coordination/AsyncProducerConsumerQueue.cs) which is an alternative to `BufferBlock`. – Fanblade May 14 '21 at 16:00
  • 2
    @Fanblade: True, but these days I point people towards `System.Threading.Channels`. Channels are a very efficient and very modern solution. – Stephen Cleary May 14 '21 at 17:37
38

Simple approach with C# 8.0 IAsyncEnumerable and Dataflow library

// Instatiate an async queue
var queue = new AsyncQueue<int>();

// Then, loop through the elements of queue.
// This loop won't stop until it is canceled or broken out of
// (for that, use queue.WithCancellation(..) or break;)
await foreach(int i in queue) {
    // Writes a line as soon as some other Task calls queue.Enqueue(..)
    Console.WriteLine(i);
}

With an implementation of AsyncQueue as follows:

public class AsyncQueue<T> : IAsyncEnumerable<T>
{
    private readonly SemaphoreSlim _enumerationSemaphore = new SemaphoreSlim(1);
    private readonly BufferBlock<T> _bufferBlock = new BufferBlock<T>();

    public void Enqueue(T item) =>
        _bufferBlock.Post(item);

    public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)
    {
        // We lock this so we only ever enumerate once at a time.
        // That way we ensure all items are returned in a continuous
        // fashion with no 'holes' in the data when two foreach compete.
        await _enumerationSemaphore.WaitAsync();
        try {
            // Return new elements until cancellationToken is triggered.
            while (true) {
                // Make sure to throw on cancellation so the Task will transfer into a canceled state
                token.ThrowIfCancellationRequested();
                yield return await _bufferBlock.ReceiveAsync(token);
            }
        } finally {
            _enumerationSemaphore.Release();
        }

    }
}
Bruno Zell
  • 7,761
  • 5
  • 38
  • 46
  • 1
    I love it when an old question gets a modern update. Have an upvote. I haven't checked out `IAsyncEnumerable` but am very familiar with javascript's `Symbol.asyncIterator` which looks like more or less the same concept. – spender Apr 30 '19 at 01:33
  • Thanks @spender! I think so, it's basically an `IEnumerable`, but you can asynchronously await new items so it's a non-blocking operation. – Bruno Zell Apr 30 '19 at 01:41
  • I wonder, is there any specific reason for using `SemaphoreSlim(1)` instead of a `lock` ? – valorl May 01 '20 at 13:05
  • 4
    @valori inside a `lock` there can't be an `await` – Bruno Zell May 02 '20 at 08:50
  • And add-on just to get the current progress of the queue, add a property in AsyncQueue.cs class public int Count { get { return _bufferBlock.Count; } } using this count we can check check in foreach loop that if the queue is empty or not: await foreach(int i in queue) { if(queue.Count > 1) { // queue is not empty } else { // queue is empty } // Writes a line as soon as some other Task calls queue.Enqueue(..) Console.WriteLine(i); } – Yasir Ali May 22 '20 at 18:43
  • I used this in an wrapper class to convert an IObserver to a IAsyncEnumerable. Thanks! – MplsAmigo Jul 29 '20 at 02:43
  • cancellation `token` could be given to `WaitAsync(token)` as param too – Vinigas Jan 19 '23 at 13:33
30

There is an official way to do this now: System.Threading.Channels. It's built into the core runtime on .NET Core 3.0 and higher (including .NET 5.0 and 6.0), but it's also available as a NuGet package on .NET Standard 2.0 and 2.1. You can read through the docs here.

var channel = System.Threading.Channels.Channel.CreateUnbounded<int>();

To enqueue work:

// This will succeed and finish synchronously if the channel is unbounded.
channel.Writer.TryWrite(42);

To complete the channel:

channel.Writer.TryComplete();

To read from the channel:

var i = await channel.Reader.ReadAsync();

Or, if you have .NET Core 3.0 or higher:

await foreach (int i in channel.Reader.ReadAllAsync())
{
    // whatever processing on i...
}
kanders84152
  • 1,251
  • 10
  • 17
  • 1
    Next time I have an opportunity to use such a structure, I'll check this out... If it works out, I'll give you the green tick. `...Writer.WaitToWriteAsync()` on a bounded queue also looks super handy. Nice find... Thanks for adding this. – spender Apr 09 '21 at 21:26
  • 3
    Useful link: [An Introduction to System.Threading.Channels](https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/) – Theodor Zoulias Apr 10 '21 at 06:54
7

One simple and easy way to implement this is with a SemaphoreSlim:

public class AwaitableQueue<T>
{
    private SemaphoreSlim semaphore = new SemaphoreSlim(0);
    private readonly object queueLock = new object();
    private Queue<T> queue = new Queue<T>();

    public void Enqueue(T item)
    {
        lock (queueLock)
        {
            queue.Enqueue(item);
            semaphore.Release();
        }
    }

    public T WaitAndDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        semaphore.Wait(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }

    public async Task<T> WhenDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        await semaphore.WaitAsync(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }
}

The beauty of this is that the SemaphoreSlim handles all of the complexity of implementing the Wait() and WaitAsync() functionality. The downside is that queue length is tracked by both the semaphore and the queue itself, and they both magically stay in sync.

Ryan
  • 1,670
  • 18
  • 25
  • Good if performance is not of the highest importance, bursts of enqueues or dequeues are not expected and time to process each item is significant. It uses locking, meaning that the collection can only be accessed by one thread at a time and all others will wait blocking when enqueuing or dequeuing an item. – Jordi Nov 04 '20 at 12:55
  • The result of `semaphore.WaitAsync()` should taken into account, and in case the timeout is reached return `null`, a default value or throw an exception. – Guillermo Prandi Jan 24 '21 at 16:01
  • @GuillermoPrandi The semaphore.WaitAsync task does not return a value. If the timeout is reached, it will throw a `TaskCanceledException` which will bubble. – Ryan Feb 08 '21 at 00:55
  • @Ryan https://learn.microsoft.com/en-us/dotnet/api/system.threading.semaphoreslim.waitasync?view=net-5.0#System_Threading_SemaphoreSlim_WaitAsync_System_Int32_System_Threading_CancellationToken_ _"A task that will complete with a result of true if the current thread successfully entered the SemaphoreSlim, otherwise with a result of false."_ – Guillermo Prandi Feb 08 '21 at 12:35
  • How do you use this to queue a list of awaitable jobs with returns? – johnstaveley Mar 18 '21 at 05:44
5

My atempt (it have an event raised when a "promise" is created, and it can be used by an external producer to know when to produce more items):

public class AsyncQueue<T>
{
    private ConcurrentQueue<T> _bufferQueue;
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue;
    private object _syncRoot = new object();

    public AsyncQueue()
    {
        _bufferQueue = new ConcurrentQueue<T>();
        _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>();
    }

    /// <summary>
    /// Enqueues the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    public void Enqueue(T item)
    {
        TaskCompletionSource<T> promise;
        do
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;                                       
            }
        }
        while (promise != null);

        lock (_syncRoot)
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;
            }

            _bufferQueue.Enqueue(item);
        }            
    }

    /// <summary>
    /// Dequeues the asynchronous.
    /// </summary>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns></returns>
    public Task<T> DequeueAsync(CancellationToken cancellationToken)
    {
        T item;

        if (!_bufferQueue.TryDequeue(out item))
        {
            lock (_syncRoot)
            {
                if (!_bufferQueue.TryDequeue(out item))
                {
                    var promise = new TaskCompletionSource<T>();
                    cancellationToken.Register(() => promise.TrySetCanceled());

                    _promisesQueue.Enqueue(promise);
                    this.PromiseAdded.RaiseEvent(this, EventArgs.Empty);

                    return promise.Task;
                }
            }
        }

        return Task.FromResult(item);
    }

    /// <summary>
    /// Gets a value indicating whether this instance has promises.
    /// </summary>
    /// <value>
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>.
    /// </value>
    public bool HasPromises
    {
        get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; }
    }

    /// <summary>
    /// Occurs when a new promise
    /// is generated by the queue
    /// </summary>
    public event EventHandler PromiseAdded;
}
André Bires
  • 251
  • 3
  • 7
  • I think this is the best solution. I've implemented this and tested it extensively. A few notes: the call to !promise.Task.IsCanceled is unnecessary. I added a ManualResetEventSlim to track when the bufferQueue is empty so that a caller can block to wait for the queue to empty. – Brian Heilig Mar 09 '16 at 14:58
  • 1
    You [should be disposing](http://stackoverflow.com/a/21653382/298609) `CancellationTokenRegistration` you got from the `cancellationToken.Register` call. – Paya Mar 06 '17 at 01:27
1

Check out https://github.com/somdoron/AsyncCollection, you can both dequeue asynchronously and use C# 8.0 IAsyncEnumerable.

The API is very similar to BlockingCollection.

AsyncCollection<int> collection = new AsyncCollection<int>();

var t = Task.Run(async () =>
{
    while (!collection.IsCompleted)
    {
        var item = await collection.TakeAsync();

        // process
    }
});

for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}

collection.CompleteAdding();

t.Wait();

With IAsyncEnumeable:

AsyncCollection<int> collection = new AsyncCollection<int>();

var t = Task.Run(async () =>
{
    await foreach (var item in collection)
    {
        // process
    }
});

for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}

collection.CompleteAdding();

t.Wait();
somdoron
  • 4,653
  • 2
  • 16
  • 24
  • Your example `var item = await collection.TakeAsync()` seems suitable for a single consumer only. With multiple consumers you may get [`InvalidOperationException`](https://github.com/somdoron/AsyncCollection/blob/master/src/AsyncCollection.cs#L294)s. I think you should use `TryTakeAsync` instead of `TakeAsync`, to make it work correctly with multiple consumers too. – Theodor Zoulias Aug 08 '19 at 22:50
1

It may be overkill for your use case (given the learning curve), but Reactive Extentions provides all the glue you could ever want for asynchronous composition.

You essentially subscribe to changes and they are pushed to you as they become available, and you can have the system push the changes on a separate thread.

Morten Mertner
  • 9,414
  • 4
  • 39
  • 56
  • 1
    I'm at least partially versed in Reactive, but it's a little esoteric to use in production as others may have to maintain the code. I'm really digging the simplicity that async/await is bringing to a previously very complicated server product, and I'm trying to keep all the async tech under a single technology. – spender Oct 23 '11 at 00:53
0

Well 8 years later I hit this very question and was about to implement the MS AsyncQueue<T> class found in nuget package/namespace: Microsoft.VisualStudio.Threading

Thanks to @Theodor Zoulias for mentioning this api may be outdated and the DataFlow lib would be a good alternative.

So I edited my AsyncQueue<> implementation to use BufferBlock<>. Almost the same but works better.

I use this in an AspNet Core background thread and it runs fully async.

protected async Task MyRun()
{
    BufferBlock<MyObj> queue = new BufferBlock<MyObj>();
    Task enqueueTask = StartDataIteration(queue);

    while (await queue.OutputAvailableAsync())
    {
        var myObj = queue.Receive();
        // do something with myObj
    }

}

public async Task StartDataIteration(BufferBlock<MyObj> queue)
{
    var cursor = await RunQuery();
    while(await cursor.Next()) { 
        queue.Post(cursor.Current);
    }
    queue.Complete(); // <<< signals the consumer when queue.Count reaches 0
}

I found that using the queue.OutputAvailableAsync() fixed the issue that I had with AsyncQueue<> -- trying to determine when the queue was complete and not having to inspect the dequeue task.

bmiller
  • 1,454
  • 1
  • 14
  • 14
  • Awaiting both `queue.DequeueAsync()` and `queue.Completion` with `Task.WhenAny` is a clever trick, but it feels like a hack to overcome the shortcomings of a poor API design. The alternative classes (Dataflow `BufferBlock` and `Channel`) offer methods (`OutputAvailableAsync` and `WaitToReadAsync` respectively) that allow awaiting for more elements without having to handle an exception as a feedback mechanism. The problem with your trick is that you may end up with a faulted task with its exception not observed, triggering in this case the `TaskScheduler.UnobservedTaskException` event. – Theodor Zoulias Mar 10 '20 at 16:24
  • There are other means of notification in the class - but MS didn't have an example in the docs. In my case I have multiple tasks to await so I had to use WhenAny anyway. -and if a Task throws it can be caught as an AggregateException. – bmiller Mar 11 '20 at 17:30
  • My point is that the class `Microsoft.VisualStudio.Threading.AsyncQueue` shouldn't be used for new projects, because today there are better alternatives available. Especially the [`Channel`](https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/) class, that not only offers a better API but it also has excellent performance characteristics. – Theodor Zoulias Mar 11 '20 at 18:28
  • 1
    ok, you're right, AsyncQueue is based on the TPL library and looks to be designed to work in Visual Studio extensions. I'll edit my answer with my implementation. Thanks for your comment, you may have saved me a pile of headache's. – bmiller Mar 11 '20 at 19:29
  • There is a potential race condition in your new implementation (the one based on a `BufferBlock`), that could surface in case you had multiple consumers. The `Receive` method could be called by one consumer just after another consumer has taken the last item from the queue. For this reason it is preferable to use the `TryReceive` method, as a condition either in an `if` or in a `while` block, so that you don't have to review the consuming code in case you update your architecture later. Look [here](https://stackoverflow.com/a/58707633/11178549) for an example. – Theodor Zoulias Mar 11 '20 at 22:43
0

Here's the implementation I'm currently using.

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();
    object queueSyncLock = new object();
    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> DequeueAsync(CancellationToken ct)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        ct.Register(() =>
        {
            lock (queueSyncLock)
            {
                tcs.TrySetCanceled();
            }
        });
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs = null;
        T firstItem = default(T);
        lock (queueSyncLock)
        {
            while (true)
            {
                if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem))
                {
                    waitingQueue.TryDequeue(out tcs);
                    if (tcs.Task.IsCanceled)
                    {
                        continue;
                    }
                    queue.TryDequeue(out firstItem);
                }
                else
                {
                    break;
                }
                tcs.SetResult(firstItem);
            }
        }
    }
}

It works good enough, but there's quite a lot of contention on queueSyncLock, as I am making quite a lot of use of the CancellationToken to cancel some of the waiting tasks. Of course, this leads to considerably less blocking I would see with a BlockingCollection but...

I'm wondering if there is a smoother, lock free means of achieving the same end

spender
  • 117,338
  • 33
  • 229
  • 351
-5

You could just use a BlockingCollection ( using the default ConcurrentQueue ) and wrap the call to Take in a Task so you can await it:

var bc = new BlockingCollection<T>();

T element = await Task.Run( () => bc.Take() );
Nick Butler
  • 24,045
  • 4
  • 49
  • 70
  • 8
    Nice idea, but I'm not happy with blocking. I'm going to have a few thousand clients each with their own message queue. Any blocking will sink the ship because it will tie up threads doing nothing. The reason I want an awaitable, non-blocking Task is so I can keep all operations in the threadpool without causing threadpool starvation. – spender Oct 23 '11 at 12:58