20

I'm tinkering around with the new IAsyncEnumerable<T> stuff in C# 8.0. Let's say I've got some method somewhere that I want to consume:

public IAsyncEnumerable<T> SomeBlackBoxFunctionAsync<T>(...) { ... }

I'm aware that I can use it with the await foreach... syntax. But let's say my consumer needs to have all results from this function before it continues. What's the best syntax to await all results before continuing? In other words, I'd like to be able to do something like:

// but that extension - AllResultsAsync() - doesn't exist :-/
List<T> myList = await SomeBlackBoxFunctionAsync<T>().AllResultsAsync(); 

What's the correct way to do this?

Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
Shaul Behr
  • 36,951
  • 69
  • 249
  • 387

2 Answers2

24

A warning first: by definition, an async stream may never end and keep producing results until the application terminates. This is already used e.g. in SignalR or gRPC. Polling loops also work this way.

As such, using ToListAsync on an async stream may have unintended consequences!


Operators like this are already available through the System.Linq.Async package.

Consuming the entire stream is available through ToListAsync. The code is deceptively simple, but hides a few interesting issues :

public static ValueTask<List<TSource>> ToListAsync<TSource>(this IAsyncEnumerable<TSource> source, CancellationToken cancellationToken = default)
{
    if (source == null)
        throw Error.ArgumentNull(nameof(source));

    if (source is IAsyncIListProvider<TSource> listProvider)
        return listProvider.ToListAsync(cancellationToken);

    return Core(source, cancellationToken);

    static async ValueTask<List<TSource>> Core(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
    {
        var list = new List<TSource>();

        await foreach (var item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
        {
            list.Add(item);
        }

        return list;
    }
}

First of all, it returns a ValueTask. Second, it ensures cancellation is observed and ConfigureAwait(false) is used, to prevent deadlocks. Finally, if the source already offers its own ToListAsync implementation via IAsyncIListProvider, the operator defers to that.

It's also interesting to note that while the IAsyncIListProvider interface is public, it's only implemented by internal and private classes within System.Linq.Async.

Ian Kemp
  • 28,293
  • 19
  • 112
  • 138
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
6

Based on @DmitryBychenko's comment, I wrote an extension to do want I want:

    public static async Task<ICollection<T>> AllResultsAsync<T>(this IAsyncEnumerable<T> asyncEnumerable)
    {
        if (null == asyncEnumerable)
            throw new ArgumentNullException(nameof(asyncEnumerable));  

        var list = new List<T>();
        await foreach (var t in asyncEnumerable)
        {
            list.Add(t);
        }

        return list;
    }

I'm just a little surprised this wasn't shipped natively with C# 8.0...it seems like a pretty obvious need.

Dmitry Bychenko
  • 180,369
  • 20
  • 160
  • 215
Shaul Behr
  • 36,951
  • 69
  • 249
  • 387
  • Have a look at this GitHub [thread](https://github.com/dotnet/corefx/issues/36545), basically there a community based package and [repo](https://github.com/Damien-The-Unbeliever/Linq.AsyncEnumerable) with linq support for AsyncEnumerable. Or use `System.Linq.Async` provided by Rx. – Pavel Anikhouski Nov 18 '19 at 13:14
  • 1
    Please, add *validation* (since `AllResultsAsync` is a `public` method) for `asyncEnumerable` - it must not be `null` and have my +1 – Dmitry Bychenko Nov 18 '19 at 13:18
  • 1
    It *is* shipped, as part of the System.Linq.Async package. – Panagiotis Kanavos Nov 18 '19 at 13:19
  • @PanagiotisKanavos Happy to hear it! What's the method called in System.Linq.Async? If it's there, that should be the correct answer to my question. – Shaul Behr Nov 18 '19 at 13:21