6

I have a method that returns an async enumerator

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        foreach (var item in ListOfWorkItems)
        {
            yield return DoWork(item);
        }
    }

And the caller:

    public async Task LogResultsAsync()
    {
        await foreach (var result in DoWorkAsync())
        {
            Console.WriteLine(result);
        }
    }

Because DoWork is an expensive operation, I'd prefer to somehow parallelize it, so it works similar to:

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        Parallel.ForEach(ListOfWorkItems, item =>
        {
            yield return DoWork(item);
        });
    }

However I can't do yield return from inside Parallel.Foreach so just wonder what's the best way to go about this?

The order of returned results doesn't matter.

Thanks.

Edit: Sorry I left out some code in DoWorkAsync, it was indeed awaiting on something I just didn't put it in the code above because that's not very relevent to the question. Updated now

Edit2: DoWork is mostly I/O bound in my case, it's reading data from a database.

Godsent
  • 950
  • 2
  • 8
  • 22
  • You're not awaiting anything, why does `DoWorkAsync` then need to be async? – MindSwipe Aug 13 '20 at 08:31
  • Is `DoWorkAsync` actually asynchronous? You aren't using `await`. – Johnathan Barclay Aug 13 '20 at 08:31
  • 6
    The design of `IAsyncEnumerable` means that there's pressure from both directions: the producer can't produce another element until the consumer has consumed the previous one, and the consumer can't consume a new element until the producer has produced it. It sounds like you don't want that, which is fine. Since `ListOfWorkItems.AsParallel().Select(x => DoWork(x))` returns a `ParallelQuery`, why not return a `ParallelQuery`? (perhaps with `AsUnordered()` if order doesn't matter). If you need an `IAsyncEnumerable`, you can loop over the `ParallelQuery` and yield each element – canton7 Aug 13 '20 at 08:31
  • @canton7 I actually didn't know `IAsyncEnumerable` works that way. That's a great idea, thanks – Godsent Aug 13 '20 at 08:48
  • @MindSwipe updated the question.. sorry for the confusion – Godsent Aug 13 '20 at 08:49
  • @JohnathanBarclay updated the question.. sorry for the confusion – Godsent Aug 13 '20 at 08:49
  • What does the `DoWork` method actually do? Is it a CPU-bound operation (calculations) or an I/O-bound operation (web requests/filesystem access)? – Theodor Zoulias Aug 13 '20 at 11:54
  • @TheodorZoulias it's I/O-bound, reading data from a database – Godsent Aug 13 '20 at 16:12
  • In this case (I/O-bound workload) you could consider making asynchronous the `DoWork` method that transforms each individual item, to stay in the spirit of asynchronous programming (which aims at minimizing the number of blocked threads). – Theodor Zoulias Aug 13 '20 at 18:27

3 Answers3

3

Here is a basic implementation that uses a TransformBlock frοm the TPL Dataflow library:

public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
{
    // Define the dataflow block
    var block = new TransformBlock<IWorkItem, IResult>(async item =>
    {
        return await TransformAsync(item);
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 10, // the default is 1
        EnsureOrdered = false // the default is true
    });

    // Feed the block with input data
    foreach (var item in workItems)
    {
        block.Post(item);
    }
    block.Complete();

    // Stream the block's output as IAsyncEnumerable
    while (await block.OutputAvailableAsync())
    {
        while (block.TryReceive(out var result))
        {
            yield return result;
        }
    }

    // Propagate the first exception, if any.
    await block.Completion;
}

This implementation is not perfect because in case the consumer of the IAsyncEnumerable abandons the enumeration prematurely, the TransformBlock will continue working in the background until all work items have been processed. Also it doesn't support cancellation, which all respectable IAsyncEnumerable producing methods should support. These missing features could be added relatively easily. If you are interested at adding them, look at this question.

Another imperfection is that in case the await TransformAsync(item) throws an OperationCanceledException, this error is suppressed. This is the by design behavior of TPL Dataflow. In case this is a problem, you can find here the ingredients needed for a solution (it's not trivial).


.NET 6 update: A new API DataflowBlock.ReceiveAllAsync has been introduced in .NET 6, that can simplify the streaming of the block's output. There is a gotcha though. See this answer for details.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • To be clear, because all of the data must be fed in before any results are yielded back, and due to the maxDegreeofParallism, for large inputs (10k for example) things won't actually start coming back immediately? The 10,000th item wouldn't Post until at least 9989 of the previous items completed, at which point those 9989 items would be set to yield return immediately. (All of this presumes that Post is blocking when MaxDegreeofParallelism is met so correct me if that's not the case.) – Caleb Holt May 12 '21 at 16:56
  • 1
    @CalebHolt nope, the `Post` method pushes a message in the `TransformBlock`'s internal buffer, and returns immediately. In case the buffer is full, the message is not accepted and the `Post` returns `false`. In my example the `TransformBlock` is not configured with a `BoundedCapacity`, so its buffer has unlimited size, and all messages will be accepted. A more sophisticated approach would be to configure it with a reasonable `BoundedCapacity`, and feed the `TransformBlock` with the asynchronous `await block.SendAsync(item);` instead of the `Post`. – Theodor Zoulias May 12 '21 at 19:19
2

As suggested by canton7, you could use AsParallel instead of the Parallel.ForEach.

This can be consumed inside a standard foreach loop where you can yield the results:

public async IAsyncEnumerable<IResult> DoWorkAsync()
{
    await Something();
    foreach (var result in ListOfWorkItems.AsParallel().Select(DoWork))
    {
        yield return result;
    }
}

As mentioned by Theodor Zoulias, the enumerable returned isn't actually asynchronous at all.

If you simply need to consume this using await foreach this shouldn't be a problem, but to be more explicit, you could return the IEnumerable and have the caller parallelise it:

public async Task<IEnumerable<Item>> DoWorkAsync()
{
    await Something();
    return ListOfWorkItems;
}

// Caller...
Parallel.ForEach(await DoWorkAsync(), item => 
{
    var result = DoWork(item);
    //...
});

Although this may be less maintainable if it need to be called in multiple places

Johnathan Barclay
  • 18,599
  • 1
  • 22
  • 35
  • There are two problems with this approach. (1) the PLINQ buffers quite aggressively the produced results. This can be fixed with the option `WithMergeOptions(ParallelMergeOptions.NotBuffered)`. (2) the PLINQ uses the current thread as one of the worker threads, so the caller will be blocked between one `yield return` and the next. This defeats the whole purpose of *Async* in the `IAsyncEnumerable`, and it is not fixable. – Theodor Zoulias Aug 13 '20 at 09:48
  • @TheodorZoulias For point (2), the method will resume on the thread-pool after the `await`, so PLINQ using that thread won't affect the caller. That's unless there is a synchronisation context, in which case just add `ConfigureAwait(false)`. – Johnathan Barclay Aug 13 '20 at 10:11
  • Yes, most probably it will resume on the `ThreadPool`, assuming that the `Something().ConfigureAwait(false)` will not return an already completed task. But this is not what makes an enumeration asynchronous. To be truly asynchronous implies that at least some of the calls to the `IAsyncEnumerator.MoveNextAsync()` will return not-completed tasks. If all these calls block the current thread and return completed tasks, then you have essentially an `IEnumerable` masqueraded as `IAsyncEnumerable`. It would be more honest to return an `IEnumerable`, and enumerate it on the `ThreadPool`. – Theodor Zoulias Aug 13 '20 at 10:49
  • @TheodorZoulias Yes maybe. But if calling `Something()` and iterating over `ListOfWorkItems` are interdependent, having them within a single method may produce more maintainable code. Awaiting `Something()` releases the thread (assuming it is actually asynchronous), then the iteration must occur in a blocking fashion irrespective of where it is called from. – Johnathan Barclay Aug 13 '20 at 11:59
  • You have a point. Returning `IAsyncEnumerable` has some merits in this case. This is not the intended usage of this technology though. Returning an `IAsyncEnumerable` is a contract, and creates the expectation that the caller will not be blocked. The implementation of this answer is async only for the duration of `await Something()`, and the rest of the enumeration is blocking, so it breaks the contract. My argument is that the PLINQ is probably not the right tool for cracking this problem. – Theodor Zoulias Aug 13 '20 at 13:08
  • Regarding using `ConfigureAwait(false)` to resume on the `ThreadPool`, it should be noted that adding `ConfigureAwait(false)` inside the `DoWorkAsync` method will have no effect to where the consumer of the `IAsyncEnumerable` resumes after every `MoveNextAsync`. The consumer can configure the enumeration independently from the producer, by appending `ConfigureAwait` to the `IAsyncEnumerable` itself. In case the consumer is the UI thread, and the enumeration happens with the default configuration, the UI thread will be blocked. – Theodor Zoulias Aug 13 '20 at 13:08
1

The solution proposed by Theodor Zoulias unfortunately has a small issue: you can't choose to aggregate multiple exceptions so only the first exception is propagated. Here's a modern solution using System.Threading.Channels (available since .NET Core 3) and Parallel.ForEachAsync (available since .NET 6) in order to limit the max degree of parallelism.

#nullable enable

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace StackOverflow.SampleCode;

public class ParallelExecutionException<T> : Exception
{
    internal ParallelExecutionException(T item, Exception innerException) : base(innerException.Message, innerException)
    {
        Item = item;
    }

    public T Item { get; }

    public new Exception InnerException => base.InnerException!;
}

public static class AsyncEnumerableExtensions
{
    public static async IAsyncEnumerable<TOutput> AsParallelAsync<TInput, TOutput>(this IAsyncEnumerable<TInput> source, int maxDegreeOfParallelism, Func<TInput, CancellationToken, Task<TOutput>> transform, bool aggregateException = false, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(source);
        if (maxDegreeOfParallelism < 1)
            throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));

        var channelOptions = new UnboundedChannelOptions { SingleReader = true };
        var channel = Channel.CreateUnbounded<TOutput>(channelOptions);

        _ = Task.Run(async () =>
        {
            var exceptions = new List<Exception>();
            var writer = channel.Writer;
            var parallelOptions = new ParallelOptions
            {
                MaxDegreeOfParallelism = maxDegreeOfParallelism,
                CancellationToken = cancellationToken,
            };
            await Parallel.ForEachAsync(source, parallelOptions, async (item, ct) =>
            {
                try
                {
                    var result = await transform(item, ct);
                    await writer.WriteAsync(result, ct).ConfigureAwait(false);
                }
                catch (Exception exception)
                {
                    var parallelExecutionException = new ParallelExecutionException<TInput>(item, exception);
                    if (aggregateException)
                    {
                        exceptions.Add(parallelExecutionException);
                    }
                    else
                    {
                        writer.Complete(parallelExecutionException);
                    }
                }
            });
            if (aggregateException)
            {
                writer.Complete(exceptions.Any() ? new AggregateException(exceptions) : null);
            }
        }, cancellationToken);

        await foreach (var result in channel.Reader.ReadAllAsync(cancellationToken))
        {
            yield return result;
        }
    }
}

With this solution, you can choose whether to abort at the first encountered exception (aggregateException = false) or continue processing and throw an AggregateException (aggregateException = true) once all the source items are processed.

0xced
  • 25,219
  • 10
  • 103
  • 255
  • 0xced my answer is supposed to propagate the first exception that occurred during the transformation, via the `await block.Completion;` line. Do you have an experimental demonstration that it fails to do so? A known problem in my answer is that `OperationCanceledException`s are suppressed ([by design](https://github.com/dotnet/runtime/issues/29619 "Dataflow TransformBlock silently fails if TaskCanceledException is thrown")), but all other exception types are surfaced. – Theodor Zoulias Dec 29 '22 at 10:35
  • Regarding your solution, I could argue that the `bool aggregateException` has the wrong default. [Firing and forgetting](https://stackoverflow.com/a/61320933/11178549) is not advisable in general. Abandoning deliberately async operations is something that I would expect to find as an extension, like the [`WaitAsync`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task.waitasync) for tasks, not baked into implementations. The same applies to the `cancellationToken` argument. You are using it to abandon the enumeration, while operations are still running in the background. – Theodor Zoulias Dec 29 '22 at 10:57
  • 1
    Somehow I missed the last line of your solution (`await block.Completion;`) when copying it! Indeed the exception is propagated. I have amended my answer accordingly. – 0xced Dec 29 '22 at 16:20