4

I have a case when I need to receive data from more than one IAsyncEnumerable source. For performance benefit it should be performed in parallel manner.

I have written such code to achieve this goal using AsyncAwaitBestPractices, System.Threading.Tasks.Dataflow and System.Linq.Async nuget packages:

public static async IAsyncEnumerable<T> ExecuteSimultaneouslyAsync<T>(
        this IEnumerable<IAsyncEnumerable<T>> sources,
        int outputQueueCapacity = 1,
        TaskScheduler scheduler = null)
    {
        var sourcesCount = sources.Count();

        var channel = outputQueueCapacity > 0 
            ? Channel.CreateBounded<T>(sourcesCount)
            : Channel.CreateUnbounded<T>();

        sources.AsyncParallelForEach(
                async body => 
                {
                    await foreach (var item in body)
                    {
                        await channel.Writer.WaitToWriteAsync();
                        await channel.Writer.WriteAsync(item);
                    }
                },
                maxDegreeOfParallelism: sourcesCount,
                scheduler: scheduler)
            .ContinueWith(_ => channel.Writer.Complete())
            .SafeFireAndForget();

        while (await channel.Reader.WaitToReadAsync())
            yield return await channel.Reader.ReadAsync();
    }

public static async Task AsyncParallelForEach<T>(
    this IEnumerable<T> source,
    Func<T, Task> body,
    int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };

    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    foreach (var item in source)
        block.Post(item);

    block.Complete();

    await block.Completion;
}

This code works fine until two or more sources throws exception. It leeds to situation when second exception can not be handled and crushes application in some cases.

So I wonder if there is better way to consume data from several IAsyncEnumerable sources in parallel manner?

  • Related: [Merge multiple IAsyncEnumerable streams](https://stackoverflow.com/questions/70658393/merge-multiple-iasyncenumerable-streams). You might also find this interesting: [How to merge multiple asynchronous sequences without left-side bias?](https://stackoverflow.com/questions/70710153/how-to-merge-multiple-asynchronous-sequences-without-left-side-bias) – Theodor Zoulias Jun 22 '22 at 08:58
  • 1
    As a side note the `ExecuteSimultaneouslyAsync` implementation in your answer drains aggressively the source sequences, by launching fire-and-forget consuming loops, and storing the consumed elements in a hidden queue. The effect is that the enumeration of the source sequences is not driven by the enumeration of the resulting merged sequence. In some scenarios this could be an unacceptable behavior. – Theodor Zoulias Jun 22 '22 at 09:06
  • @TheodorZoulias Thank you for your attention to this question! Can you give an example of such unnaceptable scenarios to keep them in mind on using **ExecuteSimultaneouslyAsync**? – Alexander Zhyshkevich Jun 22 '22 at 09:28
  • Alexander one scenario is a hypothetical requirement of enforcing a maximum latency between producing an element and processing it, for business reasons. Another scenario is the case that each produced element allocates a heavy disposable resource upon creation, and you don't want to run out of memory. – Theodor Zoulias Jun 22 '22 at 09:44
  • @TheodorZoulias We can set latency in consuming method if I understood correctly "enforcing a maximum latency between producing an element and processing it". About the second point: we pass an object link copy to intermediate buffers. I dont think that this can consume to much memory. But sure there can be a problem if we use large structs as **ExecuteSimultaneouslyAsync** generic argument. – Alexander Zhyshkevich Jun 22 '22 at 10:14
  • How can you set the latency in the consuming method? The consumer might have to do an API call for each consumed element, and the API call might take a long time. – Theodor Zoulias Jun 22 '22 at 11:14
  • 1
    I apologize, I didn't get your point – Alexander Zhyshkevich Jun 22 '22 at 11:21
  • @AlexanderZhyshkevich the code can be simplified a lot. A single output channel can be used to *merge* responses so you can read them as an IAsyncEnumerable. Having unbounded channels won't improve performance though, it will only flood memory if consumers aren't fast enough. Go's default channel capacity is 1. Finally, exception handling becomes *very* easy if you use a Result class and [Railway-oriented programming techniques](https://fsharpforfunandprofit.com/rop/). This is common in Go as well. In case of error, wrap the exception as a failed Result, otherwise generate an OK Result – Panagiotis Kanavos Jun 30 '22 at 07:31
  • @PanagiotisKanavos I appreciate your comment! I have added appropriate parameter to `ExecuteSimultaneouslyAsync` method. Also thank you for link about railway-oriented techniques! I will keep it in mind to use in future. – Alexander Zhyshkevich Jun 30 '22 at 10:02
  • @AlexanderZhyshkevich I posted an answer that shows how simpler the code could be – Panagiotis Kanavos Jun 30 '22 at 11:16
  • This question needs to be improved, by stating exactly what is requested. Specifically it should be clarified that consuming greedily and uncontrollably the source sequences, and storing their elements in an unbounded hidden queue, is precisely the desirable behavior. In case the consumer of the merged sequence abandons the enumeration, either deliberately or as a result an exception, the draining of the source sequences should continue in the background, without any cancellation option, until they are all completed or the process is terminated, whatever comes first. – Theodor Zoulias Jul 01 '22 at 00:49

2 Answers2

6

Keeping a pipeline running in case of exceptions is extremely difficult whether it's a functional or CSP pipeline. In most cases a pipeline will need to keep working even in case of individual message failures. A failing message doesn't mean the entire pipeline has failed.

That's why Railway-oriented programming is used to wrap messages and errors into Result<TOk,TError> wrappers and "redirect" or ignore error messages. Such a class makes programming Dataflow, Channels and IAsyncEnumerable pipelines a lot easier.

In F#, using discriminated unions, one could define a Result type just with

type Result<'T,'TError> =
    | Ok of ResultValue:'T
    | Error of ErrorValue:'TError

DUs aren't in C# yet, so various alternatives have been proposed, some using inheritance from an IResult<> base, some using classes/Records which allow exhaustive pattern matching, something not available with the IResult<> techniques.

Let's assume the Result<> here is :

public record Result<T>(T? result, Exception? error)
{
    public bool IsOk => error == null;
    public static Result<T> Ok(T result) => new(result, default);
    public static Result<T> Fail(Exception exception) =>
        new(default, exception);

    public static implicit operator Result<T> (T value) 
        =>  Result<T>.Ok(value);
    public static implicit operator Result<T>(Exception err) 
        => Result<T>.Fail(err);
}

The first step is to create a CopyAsync helper that will copy all data from the input IAsyncEnumerable<Result<T>> to an output ChannelWriter<Result<T>>

public static async Task CopyToAsync<T>(
           this IAsyncEnumerable<Result<T>> input, 
           ChannelWriter<Result<T>> output,
           CancellationToken token=default)
{
    try
    {
        await foreach(var msg in input.WithCancellationToken(token).ConfigureAwait(false))
        {
            await output.WriteAsync(msg).ConfigureAwait(false);
        }
    }
    catch(Exception exc)
    {
        await output.WriteAsync(Result.Fail(exc)).ConfigureAwait(false);
    }
}

This way, even if an exception is thrown, a Failure message will be emitted instead of aborting the pipeline.

With that, you can merge multiple sources by copying input messages to an output channel :

public static ChannelReader<Result<T>> Merge(
        this IEnumerable<IAsyncEnumerable<Result<T>> inputs,
        CancellationToken token=default)
{
    var channel=Channel.CreateBounded<Result<T>>(1);

    var tasks = inputs.Select(inp=>CopyToAsync(channel.Writer,token));

    _ = Task.WhenAll(tasks)
            .ContinueWith(t=>channel.Writer.TryComplete(t.Exception));

    return channel.Reader;
}

Using BoundedCapacity=1 maintains the backpressure behavior of downstread channels or consumers.

You can read all messages in a ChannelReader through Channel.ReadAllAsync(CancellationToken) :

IEnumerable<IAsyncEnumerable<Result<T>>> sources = ...;
var merged=sources.Merge();
await foreach(var msg in merged.ReadAllAsync())
{
    //Pattern magic to get Good results only
    if(msg is ({} value,null)
    {
        //Work with value
    }
}

You can avoid exposing the channel by returning IAsyncEnumerable<> :

public static IAsyncEnumerable<Result<T>> MergeAsync(
        this IEnumerable<IAsyncEnumerable<Result<T>> inputs,
        CancellationToken token=default)
{
    return inputs.Merge(token).ReadAllAsync(token);
}

You can use System.Linq.Async to work on an IAsyncEnumerable<> using LINQ methods, eg to convert an IAsyncEnumerable<T> to an IAsyncEnumerable<Result<T>> :

source.Select(msg=>Result.Ok(msg))

Or filter failed messages before processing them :

source.Where(msg=>msg.IsOk)

You could create a method that applies a Func<T1,Task<T2>> to an input and propagates results or errors as results :

public async Task<Result<T2>> ApplyAsync<T1,T2>(this Result<T1> msg,
                                               Func<T1,Task<T2>> func)
{            
    if (msg is (_, { } err))
    {
        return err;
    }
    try
    {
        var result = await func(msg.result).ConfigureAwait(false);
        return result;
    }
    catch(Exception exc)
    {
        return exc;
    }
}

This is a ... bit ... easier in F#

Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • As I use .NetFramework 4.8 it was a little bit tricky to define `Result` record in your manner. But this [article](https://blog.ndepend.com/using-c9-record-and-init-property-in-your-net-framework-4-x-net-standard-and-net-core-projects/) helped me – Alexander Zhyshkevich Jun 30 '22 at 11:48
  • I used C# 10 techniques to reduce the noise. .NET Framework 4.8 doesn't even have IAsyncEnumerable without `Microsoft.Bcl.AsyncInterfaces `. It's sometimes easier to use Channels than IAsyncEnumerable. – Panagiotis Kanavos Jun 30 '22 at 11:49
  • We work with what we have) – Alexander Zhyshkevich Jun 30 '22 at 11:50
2

Inspired with this answer I desided to update my own code (see dependencies in question)

public record Result<T>(T Data = default, Exception error = null)
{
    public bool IsOk => error == null;
    public static Result<T> Ok(T result) => new(result, default);
    public static Result<T> Fail(Exception exception) =>
        new(default, exception);

    public static implicit operator Result<T>(T value)
        => Ok(value);

    public static implicit operator Result<T>(Exception err)
        => Fail(err);
}

public static async ValueTask AsyncParallelForEach<T>(
    this IEnumerable<T> source,
    Func<T, Task> body,
    int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };

    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    foreach (var item in source)
        block.Post(item);

    block.Complete();

    await block.Completion;
}

public static async IAsyncEnumerable<Result<T>> ExecuteParallelAsync<T>(
    this IEnumerable<IAsyncEnumerable<T>> sources,
    int outputQueueCapacity = 1,
    TaskScheduler scheduler = null)
{
    var sourcesCount = sources.Count();

    var channel = outputQueueCapacity > 0
        ? Channel.CreateBounded<Result<T>>(sourcesCount)
        : Channel.CreateUnbounded<Result<T>>();

    sources.AsyncParallelForEach(
            async body =>
            {
                try
                {
                    await foreach (var item in body)
                    {
                        if (await channel.Writer.WaitToWriteAsync().ConfigureAwait(false))
                            await channel.Writer.WriteAsync(item).ConfigureAwait(false);
                    }
                }
                catch (Exception ex)
                {
                    if (await channel.Writer.WaitToWriteAsync().ConfigureAwait(false))
                        await channel.Writer.WriteAsync(Result<T>.Fail(ex)).ConfigureAwait(false);
                }
            },
            maxDegreeOfParallelism: sourcesCount,
            scheduler: scheduler)
        .AsTask()
        .ContinueWith(_ => channel.Writer.Complete())
        .SafeFireAndForget();

    while (await channel.Reader.WaitToReadAsync().ConfigureAwait(false))
        yield return await channel.Reader.ReadAsync().ConfigureAwait(false);
}

Maybe this code will be easier to understand for someone rather than code in origin answer.

N.B!. You need to use c# 9 or above language version to be able to use records. Also if you are using .net framework 4x (as I have to) you have to do some tricks described in this article. In short you have to write below code somewhere in your project:

namespace System.Runtime.CompilerServices
{
    using System.ComponentModel;
    /// <summary>
    /// Reserved to be used by the compiler for tracking metadata.
    /// This class should not be used by developers in source code.
    /// </summary>
    [EditorBrowsable(EditorBrowsableState.Never)]
    internal static class IsExternalInit { }
}
  • The definition of the `SafeFireAndForget` is missing. Also the `ContinueWith` usage violates [this](https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca2008 "CA2008: Do not create tasks without passing a TaskScheduler") guideline. – Theodor Zoulias Jul 01 '22 at 00:36
  • The `AsyncParallelForEach` reinvents the [`Parallel.ForEachAsync`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync) for no particular reason. Returning a `ValueTask` reduces the usability, without offering any performance improvement whatsoever. And you don't need this method. The `DataflowBlockOptions.Unbounded` behavior can be implemented simply by starting all the tasks and awaiting them with `Task.WhenAll`. – Theodor Zoulias Jul 01 '22 at 00:37
  • @TheodorZoulias, don't forget that someone has to write code with .NetFramework 4.8 for some reasons where there is no definition of `Parallel.ForEachAsync` (from the box at least). – Alexander Zhyshkevich Jul 01 '22 at 05:26
  • And it is tricky to define degree or parallelism using `Task.WhenAll` (at least in .netframework 4.8). – Alexander Zhyshkevich Jul 01 '22 at 05:43
  • The `Parallel.ForEachAsync` method was introduced in .NET 6. The question is not tagged with the `.net-4.8` tag, nor it asks for a .NET Framework-compatible solution. In general when the target platform is not specified, we assume that we can use all the APIs available in the latest .NET release. – Theodor Zoulias Jul 01 '22 at 05:50
  • @Sure but I answer to your comment about no reason to define `AsyncParallelForEach` implementation – Alexander Zhyshkevich Jul 01 '22 at 05:56
  • Enforcing the maximum degree of parallelism with `Task.WhenAll` is indeed tricky, but in your answer you are configuring the `maxDegreeOfParallelism` with the value `sourcesCount`, which is equal to the total number of the tasks. This configuration has no effect in this case. – Theodor Zoulias Jul 01 '22 at 06:20
  • @TheodorZoulias Totally agree) But I use `AsyncParallelForEach` method in other places of my code where it seems to work as it should. – Alexander Zhyshkevich Jul 01 '22 at 06:38
  • Alexander good for you. But when it comes to answering questions in StackOverflow, we are supposed to focus at answering the question at hand, using the most readily available tools. Btw instead of investing your time at answering your question, it would be better IMHO to redirect this effort at improving the question itself. Currently it's not a good question, and it's unlikely to induce good answers as a result. – Theodor Zoulias Jul 01 '22 at 07:34