106

I would like to await on the result of BlockingCollection<T>.Take() asynchronously, so I do not block the thread. Looking for anything like this:

var item = await blockingCollection.TakeAsync();

I know I could do this:

var item = await Task.Run(() => blockingCollection.Take());

but that kinda kills the whole idea, because another thread (of ThreadPool) gets blocked instead.

Is there any alternative?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
avo
  • 10,101
  • 13
  • 53
  • 81
  • 3
    I don't get this, if you use `await Task.Run(() => blockingCollection.Take())` the task will be perform on other thread and your UI thread won't blocked.Isn't that the point? – Selman Genç Jan 20 '14 at 02:55
  • 11
    @Selman22, this is not a UI app. It is a library exporting `Task`-based API. It can be used from ASP.NET, for example. The code in question would not scale well there. – avo Jan 20 '14 at 03:09
  • Would it still be a problem if `ConfigureAwait` was used after the `Run()`? [ed. never mind, I see what you're saying now] – MojoFilter Jun 15 '16 at 12:28

4 Answers4

130

There are four alternatives that I know of.

The first is Channels, which provides a threadsafe queue that supports asynchronous Read and Write operations. Channels are highly optimized and optionally support dropping some items if a threshold is reached.

The next is BufferBlock<T> from TPL Dataflow. If you only have a single consumer, you can use OutputAvailableAsync or ReceiveAsync, or just link it to an ActionBlock<T>. For more information, see my blog.

The last two are types that I created, available in my AsyncEx library.

AsyncCollection<T> is the async near-equivalent of BlockingCollection<T>, capable of wrapping a concurrent producer/consumer collection such as ConcurrentQueue<T> or ConcurrentBag<T>. You can use TakeAsync to asynchronously consume items from the collection. For more information, see my blog.

AsyncProducerConsumerQueue<T> is a more portable async-compatible producer/consumer queue. You can use DequeueAsync to asynchronously consume items from the queue. For more information, see my blog.

The last three of these alternatives allow synchronous and asynchronous puts and takes.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 13
    Git Hub link for when CodePlex finally shuts down: https://github.com/StephenCleary/AsyncEx – Paul Jun 01 '17 at 16:19
  • The API documentation contains the method [`AsyncCollection.TryTakeAsync`](http://dotnetapis.com/pkg/Nito.AsyncEx/4.0.1/net45/doc/Nito.AsyncEx.AsyncCollection'1/TryTakeAsync()), but I can't find it in the downloaded `Nito.AsyncEx.Coordination.dll 5.0.0.0` (latest version). The referenced [Nito.AsyncEx.Concurrent.dll](http://dotnetapis.com/pkg/Nito.AsyncEx/4.0.1/net45/file/lib/net45/Nito.AsyncEx.Concurrent.dll) does not exist in the [package](https://www.nuget.org/packages/Nito.AsyncEx/). What am I missing? – Theodor Zoulias Aug 09 '19 at 08:52
  • @TheodorZoulias: That method was removed in v5. The v5 API docs are [here](http://dotnetapis.com/pkg/Nito.AsyncEx.Coordination/5.0.0/netstandard2.0/doc/Nito.AsyncEx.AsyncCollection'1). – Stephen Cleary Aug 10 '19 at 11:32
  • Oh, thanks. It looks like it was the easiest and safest way to enumerate the collection. `while ((result = await collection.TryTakeAsync()).Success) { }`. Why it was removed? – Theodor Zoulias Aug 10 '19 at 12:57
  • 2
    @TheodorZoulias: Because "Try" means different things to different people. I'm thinking of adding a "Try" method back in but it would actually have different semantics than the original method. Also looking at supporting async streams in a future version, which would definitely be the best method of consumption when supported. – Stephen Cleary Aug 12 '19 at 02:17
25

...or you can do this:

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class AsyncQueue<T>
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentQueue<T> _que;

    public AsyncQueue()
    {
        _sem = new SemaphoreSlim(0);
        _que = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        _que.Enqueue(item);
        _sem.Release();
    }

    public void EnqueueRange(IEnumerable<T> source)
    {
        var n = 0;
        foreach (var item in source)
        {
            _que.Enqueue(item);
            n++;
        }
        _sem.Release(n);
    }

    public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
    {
        for (; ; )
        {
            await _sem.WaitAsync(cancellationToken);

            T item;
            if (_que.TryDequeue(out item))
            {
                return item;
            }
        }
    }
}

Simple, fully functional asynchronous FIFO queue.

Note: SemaphoreSlim.WaitAsync was added in .NET 4.5 before that, this was not all that straightforward.

John Leidegren
  • 59,920
  • 20
  • 131
  • 152
  • 3
    What's the use of infinite `for`? if semaphore is released, queue has at least one item to dequeue, no? – Blendester Nov 20 '19 at 19:03
  • 3
    @Blendester there might be a race condition if multiple consumers are blocked. We cannot know for sure that there isn't at least two competing consumers and we don't know if both of them manage to wake up before they get to deque an item. In the event of a race, if one doesn't managed to deque, it will go back to sleep and wait for another signal. – John Leidegren Nov 20 '19 at 20:31
  • 1
    If two or more consumers make it past WaitAsync(), then there are an equivalent number of items in the queue, and thus they will always dequeue successfully. Am I missing something? – mindcruzer Dec 22 '19 at 23:06
  • 2
    This is a blocking collection, the semantics of `TryDequeue` are, return with a value, or do not return at all. Technically, if you have more than 1 reader, the same reader can consume two (or more) items before any other reader is fully awake. A successful `WaitAsync` is just a signal that there may be items in the queue to consume, it's not a guarantee. – John Leidegren Dec 25 '19 at 10:55
  • @JohnLeidegren `If the value of the CurrentCount property is zero before this method is called, the method also allows releaseCount threads or tasks blocked by a call to the Wait or WaitAsync method to enter the semaphore.` from https://learn.microsoft.com/en-us/dotnet/api/system.threading.semaphoreslim.release?view=netframework-4.8#System_Threading_SemaphoreSlim_Release_System_Int32_ How is a successful `WaitAsync` not have items in queue ? If N release wakes up more than N consumers than `semaphore` is broken. Isn't it ? – Ashish Negi Mar 23 '20 at 23:10
  • 1
    @AshishNegi it's just a consequence of the `TryDequeue` API design. This way, if for whatever reason the queue doesn't doesn't successfully deque, it's behavior is still defined. We could just as well throw an exception. I may have adapted this example for a code that used a monitor were we actually could wake up multiples consumers. I think it's a remnant of that. I can't see why it would be absolutely necessary. – John Leidegren Mar 24 '20 at 12:20
  • @JohnLeidegren is there a reason not to have `AsyncQueue` implement `IProducerConsumerCollection` so it can be more flexible? – Mr. Boy May 20 '20 at 14:02
  • @Mr.Boy No, other than it complicates things for the purpose of explaining. Have at it. – John Leidegren May 20 '20 at 16:00
  • Cheers @JohnLeidegren. Wanted to check I'd not missed something. It seems a nice route for those who don't want to (or can't) use `Channel` – Mr. Boy May 20 '20 at 16:17
6

The asynchronous (non-blocking) alternative of the BlockingCollection<T> is the Channel<T> class. It offers almost the same functionality, plus some extra features. You can instantiate a Channel<T> using the Channel's static factory methods, as shown below (demonstrating the default values of all available options).

Channel<Item> channel = Channel.CreateUnbounded<Item>(new UnboundedChannelOptions()
{
    SingleWriter = false,
    SingleReader = false,
    AllowSynchronousContinuations = false,
});
Channel<Item> channel = Channel.CreateBounded<Item>(new BoundedChannelOptions(capacity)
{
    SingleWriter = false,
    SingleReader = false,
    AllowSynchronousContinuations = false,
    FullMode = BoundedChannelFullMode.Wait,
});

The most striking difference is that the Channel<T> exposes a Writer and a Reader facade. So you can pass the Writer facade to a method that plays the role of the producer, and similarly the Reader facade to a method that plays the role of the consumer. The Writer is only allowed to add items in the channel, and mark it as completed. The Reader is only allowed to take items from the channel, and await its completion. Both facades expose only non-blocking APIs. For example the ChannelWriter<T> has a WriteAsync method that returns a ValueTask. If you have some reason to block on these APIs, for example if one worker of your producer/consumer pair has to be synchronous, then you can block with .AsTask().GetAwaiter().GetResult(), but this will not be as efficient as using a BlockingCollection<T>. If you want to learn more about the similarities and differences between the Channel<T> and BlockingCollection<T> classes, take a look at this answer.

An implementation of a custom AsyncBlockingCollection<T> class, having only the most basic features, can be found in the 3rd revision of this answer.

MarredCheese
  • 17,541
  • 8
  • 92
  • 91
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 4
    As an afterthought, I now think that the class name `AsyncBlockingCollection` is nonsensical. Something cannot be asynchronous and blocking at the same time, since these two concepts are the exact opposites! – Theodor Zoulias Mar 10 '20 at 16:32
  • 2
    But still, it IS an async version of the BlockingCollection :) – Stephan Møller Dec 28 '21 at 23:05
-1

This is super-simple, but it serves my needs.

    public static class BlockingCollectionEx
    {
        public async static Task<T> TakeAsync<T>(this BlockingCollection<T> bc, CancellationToken token, int inner_delay = 10)
        {
            while (!token.IsCancellationRequested)
            {
                if (bc.TryTake(out T el))
                    return el;
                else
                    await Task.Delay(inner_delay);
            }

            throw new OperationCanceledException();
        }
    }
  • 1
    Instead of `while (!token.IsCancellationRequested)` and `throw new OperationCanceledException();`, it is simpler and better to just pass the token to the `TryTake` method, like in [Dejisys's](https://stackoverflow.com/a/57708479/11178549) answer: `TryTake(out T el, 0, token)` – Theodor Zoulias Mar 20 '23 at 17:00
  • As it’s currently written, your answer is unclear. Please [edit] to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Mar 25 '23 at 00:17
  • Adding to @TheodorZoulias's comment, I think it can be simplified down to this: while (!bc.TryTake(out T el, inner_delay, token)) ; return el; – Aaron Jul 04 '23 at 06:58
  • @Aaron the intention of the `TakeAsync` is to be non-blocking. The [`TryTake`](https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1.trytake#system-collections-concurrent-blockingcollection-1-trytake(-0@-system-int32)) overload with `millisecondsTimeout` parameter is blocking, so it defeats the purpose. – Theodor Zoulias Jul 04 '23 at 07:59