3

I have an asynchronous stream of tasks, that is generated by applying an async lambda to a stream of items:

IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
    await Task.Delay(100);
    return x.ToString();
})

The methods AsyncEnumerable.Range and Select above are provided from the System.Linq.Async package.

The result I want is a stream of results, expressed as an IAsyncEnumerable<string>. The results must be streamed in the same order as the originated tasks. Also the enumeration of the stream must be throttled, so than no more than a specified number of tasks are active at any given time.

I would like a solution in the form of an extension method on the IAsyncEnumerable<Task<T>> type, so that I could chain it multiple times and form a processing pipeline, similar in functionality with a TPL Dataflow pipeline, but expressed fluently. Below is the signature of the desirable extension method:

public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int concurrencyLevel);

Accepting also a CancellationToken as argument would be a nice feature.


Update: For completeness I am including an example of a fluent processing pipeline formed by chaining twice the AwaitResults method. This pipeline starts with a PLINQ block, just to demonstrate that mixing PLINQ and Linq.Async is possible.

int[] results = await Partitioner
    .Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(2)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Thread.Sleep(100); // Simulate some CPU-bound operation
        return x;
    })
    .ToAsyncEnumerable()
    .Select(async x =>
    {
        await Task.Delay(300); // Simulate some I/O operation
        return x;
    })
    .AwaitResults(concurrencyLevel: 5)
    .Select(x => Task.Run(() =>
    {
        Thread.Sleep(100); // Simulate another CPU-bound operation
        return x;
    }))
    .AwaitResults(concurrencyLevel: 2)
    .ToArrayAsync();

Console.WriteLine($"Results: {String.Join(", ", results)}");

Expected output:

Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20


Note: In retrospect the AwaitResults method should probably be named Merge, and the concurrencyLevel argument should be named maxConcurrent, because its functionality resembles the Merge operator that exists in the Rx library. The System.Interactive.Async package does include an operator named Merge that produces IAsyncEnumerable<T>s, but none of its overloads operate on IAsyncEnumerable<Task<T>> sources. It operates on IEnumerable<IAsyncEnumerable<TSource>> and IAsyncEnumerable<IAsyncEnumerable<TSource>> sources. A parameter bufferCapacity could also be added, in order to control explicitly the size of the buffer needed for the awaiting/merging operation.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Haven't you asked this before? It's not `IAsyncEnumerable` that processes the messages, it's whatever you use to read them and process them. The solution is *not* `IAsyncEnumerable` - that wouldn't give you the items asynchronously at all. You can already "throttle", based on whatever you mean by that. Process only one item every N items or seconds? Batch them and forward them? – Panagiotis Kanavos Feb 24 '20 at 11:44
  • DataFlow *is* a way to process the stream - just set `BoundedCapacity=1` and you get ordered processing, batching, configurable DOP out of the box . Channels is another. `await foreach` is another. If you want throttling, you can create an async iterator that reads T items from the source stream and emit a `T[]` every n items. Or the Nth item. You can use System.Linq.Async to make this easier. – Panagiotis Kanavos Feb 24 '20 at 11:46
  • In short, what is the question? Even for `expressed fluently`, you can write a set of extension methods that does what you want, assuming you decide on *what* that is – Panagiotis Kanavos Feb 24 '20 at 11:49
  • BTW `no more than a specified number of tasks are active at any given time` is controlled by the DOP, not throttling. – Panagiotis Kanavos Feb 24 '20 at 11:50
  • @PanagiotisKanavos [I have asked before](https://stackoverflow.com/questions/58194212/) for a method named `WhenEach` with this signature: `public static async IAsyncEnumerable WhenEach(Task[] tasks)`. This is not throttlable and not chainable. Now I want a method that I could chain like this: `.Select().AwaitResults().Select().AwaitResults()...`, with a different concurrency level for each processing block. The throttling is **not** time based. For example with `concurrencyLevel = 5` at maximum five tasks should be active at any moment. – Theodor Zoulias Feb 24 '20 at 11:59
  • Btw DOP means degree of parallelism. We are not dealing necessarily with parallelism here. We are dealing with the level of concurrency of asynchronous operations, that could be either CPU-bound or I/O bound or a mix of both. – Theodor Zoulias Feb 24 '20 at 12:01
  • I know what DOP means, that's why I say that's not throttling - you don't need to limit the tasks if you only create a limited number to begin with. You can use a Dataflow block inside each method and get what you want out of the box. Or you can use a method that creates N worker tasks and a `Channel` to provide the output IAsyncStream. To get ordering, you can assign an incrementing ID to the dequeued items and emit them in order. A naive implementation would use a `T[N]` array to collect all results before emitting them. – Panagiotis Kanavos Feb 24 '20 at 12:12
  • As for `WhenEach(Task[] tasks)`, the problem is the use of `Task[]` instead of `IAsyncEnumerable`. I've already answered in the linked question how to do that - using a channel like this is an idiom you'll find in Go and SignalR Core. It's actually *easier* when you use `IAsyncEnumerable<>` - use `await foreach` to read from the input stream and post to a channel meant for processing, use `Enumerable.Range` to create N workers, and have them read from the input channel. Then have them write to the output channel – Panagiotis Kanavos Feb 24 '20 at 12:20

1 Answers1

4

Here is an implementation of the AwaitResults method. It is based on a SemaphoreSlim for controlling the concurrency level, and on a Channel<Task<TResult>> for storing the running tasks. The source IAsyncEnumerable<Task<TResult>> sequence is enumerated on the ThreadPool, and the hot tasks are pushed into the channel. Each task is wrapped into a higher-level task, that releases the semaphore when it completes.

The main part of the method is the yielding loop, where the tasks are dequeued from the channel one by one, and then awaited sequentially. This way the results are yielded in the same order as the tasks in the source sequence.

/// <summary>
/// Enumerates a sequence of tasks and returns a sequence of results.
/// The source sequence is enumerated with a specific maximum concurrency.
/// The enumeration of the source is suspended when the internal buffer
/// has reached its full capacity. For unlimited capacity configure with -1.
/// </summary>
public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
    this IAsyncEnumerable<Task<TResult>> source,
    int maxConcurrency = 1,
    int capacity = -1,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    ArgumentNullException.ThrowIfNull(source);
    if (maxConcurrency < 1)
        throw new ArgumentOutOfRangeException(nameof(maxConcurrency));
    if (capacity == -1) capacity = Int32.MaxValue;
    if (capacity < 1)
        throw new ArgumentOutOfRangeException(nameof(capacity));

    Channel<Task<TResult>> channel = Channel
        .CreateBounded<Task<TResult>>(capacity);
    using SemaphoreSlim semaphore = new(maxConcurrency - 1, maxConcurrency);
    using CancellationTokenSource linkedCts = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);

    Task producer = Task.Run(async () =>
    {
        try
        {
            await foreach (Task<TResult> task in source
                .WithCancellation(linkedCts.Token).ConfigureAwait(false))
            {
                await channel.Writer.WriteAsync(OnCompletionRelease(task))
                    .ConfigureAwait(false); // Without cancellation
                await semaphore.WaitAsync(linkedCts.Token)
                    .ConfigureAwait(false); // Acquire before MoveNextAsync
            }
        }
        catch (ChannelClosedException) { } // Ignore
        finally { channel.Writer.TryComplete(); }
    });

    async Task<TResult> OnCompletionRelease(Task<TResult> task)
    {
        try { return await task.ConfigureAwait(false); }
        finally { semaphore.Release(); }
    }

    try
    {
        await foreach (Task<TResult> task in channel.Reader.ReadAllAsync()
            .ConfigureAwait(false))
                yield return await task.ConfigureAwait(false);
        await producer.ConfigureAwait(false); // Propagate possible source error
    }
    finally
    {
        // Await all pending operations before completing the enumeration.
        try { linkedCts.Cancel(); }
        finally
        {
            if (!producer.IsCompleted)
            {
                // Unblock pending writes.
                channel.Writer.TryComplete();
                await Task.WhenAny(producer).ConfigureAwait(false);
            }
            if (!channel.Reader.Completion.IsCompleted)
            {
                // Drain the channel.
                while (channel.Reader.TryRead(out Task<TResult> task))
                    await Task.WhenAny(task).ConfigureAwait(false);
                Debug.Assert(channel.Reader.Completion.IsCompleted);
            }
        }
    }
}

The finally block at the bottom plays the important role of preventing any of the tasks to become fire-and-forget, in case the consumer of the resulting sequence abandons the enumeration prematurely. In case more than one tasks complete with failure, only the exception of the first faulted task will be propagated (first in order, not chronologically), and all other exceptions will trigger the TaskScheduler.UnobservedTaskException event.

Online demo.

Note: Canceling the cancellationToken may not cancel the whole operation instantaneously. For maximum responsiveness the same cancellationToken should be used for cancelling the individual tasks.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • can you explain why the feeder task would never be completed? Would it not complete once the `IAsyncEnumerable source` runs out of items to generate? – CodeMonkey Jul 13 '20 at 21:24
  • @CodeMonkey of course. The `Channel` has been initialized with a capacity of 1000, to prevent the possibility of too many tasks buffered. This can happen in case a very slow task is followed by zillions of lightening fast tasks (the results are ordered, so the results of the fast tasks must be yielded after yielding the result of the slow task), or in case the consumer of the resulting `IAsyncEnumerable` is consuming it very slowly. So if the source has more than 1000 tasks, and the consumer abandons the enumeration early, the feeder would get stuck. Hence the need for the try/finally block. – Theodor Zoulias Jul 13 '20 at 23:20
  • 1
    thank you, makes sense, i missed the EnumeratorCancelation attribute. – CodeMonkey Jul 13 '20 at 23:55