4

I have two methods connected to two different sources of Foos which return two IAsyncEnumerable<Foo>. I need to fetch all Foos from both sources before being able to process them .

Problem : I would like to query both sources simultaneously (asynchronously), ie. not waiting for Source1 to complete the enumeration before starting to enumerate Source2. From my understanding, this is what happens into the method SequentialSourcesQuery example below, am I right?

With regular tasks, I would just start the first Task, then the second one, and call a await Task.WhenAll. But I am a bit confused on how to handle IAsyncEnumerable.

public class FoosAsync
{
    public async IAsyncEnumerable<Foo> Source1() { }

    public async IAsyncEnumerable<Foo> Source2() { }

    public async Task<List<Foo>> SequentialSourcesQuery()
    {
        List<Foo> foos = new List<Foo>();

        await foreach (Foo foo1 in Source1())
        {
            foos.Add(foo1);
        }

        await foreach (Foo foo2 in Source2())
        { //doesn't start until Source1 completed the enumeration? 
            foos.Add(foo2);
        }

        return foos;
    }
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
XavierAM
  • 1,627
  • 1
  • 14
  • 30
  • A workaround would be to simply start two new tasks and then run the loops within those tasks. – smoksnes Feb 11 '21 at 10:26
  • 3
    Also, be careful when adding to the list from several threads. As discussed [here](https://stackoverflow.com/q/5589315/4949005) – smoksnes Feb 11 '21 at 10:28
  • @smoksnes ok thanks, I will have a look. Actually both list are differents, I just overly simplified the example. – XavierAM Feb 11 '21 at 10:32

3 Answers3

5

You could take advantage of the libraries System.Linq.Async and System.Interactive.Async (owned by the RxTeam who are part of the .NET Foundation). They contain operators like Merge and ToListAsync that could solve your problem easily.

// Merges elements from all of the specified async-enumerable sequences
// into a single async-enumerable sequence.
public static IAsyncEnumerable<TSource> Merge<TSource>(
    params IAsyncEnumerable<TSource>[] sources);

// Creates a list from an async-enumerable sequence.
public static ValueTask<List<TSource>> ToListAsync<TSource>(
    this IAsyncEnumerable<TSource> source,
    CancellationToken cancellationToken = default);

Putting everything together:

public Task<List<Foo>> SequentialSourcesQuery()
{
    return AsyncEnumerableEx.Merge(Source1(), Source2()).ToListAsync().AsTask();
}

By aware that these libraries have a focus on providing a rich set of features, and not on performance or efficiency. So if top-notch performance is important for your use case, niki.kante's solution will most probably outperform the above operator-based approach.

Ian Kemp
  • 28,293
  • 19
  • 112
  • 138
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    The Rx team is effectively part of Microsoft at this point (heck, one of the main contributors is the leader of the .NET Foundation!) which means that if you raise performance or efficiency concerns with them, those are likely to be addressed. Plus the Rx repo is incredibly well-tested - if you're looking for a solution that's dependable, this is the right one. – Ian Kemp Feb 19 '21 at 12:34
  • @IanKemp in general I am satisfied with the performance of the libraries owned by the RxTeam. And by that I mean that I am expecting good enough performance characteristics for practical applications (that usually incorporate all sorts of inefficiencies inside them anyway). I don't expect abysmally bad performance, but neither I expect the raw performance of a targeted and carefully crafted hand-made implementation. You could take a look at [this](https://stackoverflow.com/questions/60987491/why-is-ienumerable-toobservable-so-slow) question to see what I mean. – Theodor Zoulias Feb 19 '21 at 12:53
3

If you have two IAsyncEnumerable<T> as a source and don't care about the order of incoming data, you could use a method like the following to interleave your data.

public static class AsyncEnumerableExt
{
    public static async IAsyncEnumerable<T> Interleave<T>(this IAsyncEnumerable<T> first, IAsyncEnumerable<T> second)
    {
        var enum1 = first.GetAsyncEnumerator();
        var enum2 = second.GetAsyncEnumerator();

        var nextWait1 = enum1.MoveNextAsync().AsTask();
        var nextWait2 = enum2.MoveNextAsync().AsTask();

        do
        {
            var task = await Task.WhenAny(nextWait1, nextWait2).ConfigureAwait(false);

            if (task == nextWait1)
            {
                yield return enum1.Current;

                nextWait1 = !await task.ConfigureAwait(false) ? null : enum1.MoveNextAsync().AsTask();
            }
            else if (task == nextWait2)
            {
                yield return enum2.Current;

                nextWait2 = !await task.ConfigureAwait(false) ? null : enum2.MoveNextAsync().AsTask();
            }
        } while (nextWait1 != null && nextWait2 != null);

        while (nextWait1 != null)
        {
            if (!await nextWait1.ConfigureAwait(false))
            {
                nextWait1 = null;
            }
            else
            {
                yield return enum1.Current;
                nextWait1 = enum1.MoveNextAsync().AsTask();
            }
        }

        while (nextWait2 != null)
        {
            if (!await nextWait2.ConfigureAwait(false))
            {
                nextWait2 = null;
            }
            else
            {
                yield return enum2.Current;
                nextWait2 = enum2.MoveNextAsync().AsTask();
            }
        }
    }
}

then you can consume the data with one await foreach and store the data in a list.

Ackdari
  • 3,222
  • 1
  • 16
  • 33
  • Wow, this is really interesting, and quite complicated! I'd never have managed this one by myself. The main difference with the other answer wrapping IAsyncEnumerable into two tasks being that you can act on data while they are getting in, right? – XavierAM Feb 11 '21 at 17:33
  • @XavierAM That and it doesn't throw exceptions randomly as a result of improper synchronization. – Servy Feb 11 '21 at 18:19
  • 1
    This implementation is quite similar with how the `Merge` operator from the [System.Interactive.Async](https://www.nuget.org/packages/System.Interactive.Async) library [is implemented](https://github.com/dotnet/reactive/blob/main/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Merge.cs). – Theodor Zoulias Feb 11 '21 at 18:28
  • @XavierAM please prefer the solution of Theodor over mine. – Ackdari Feb 12 '21 at 07:48
2

You could write another async local method which returns Task.

Func<IAsyncEnumerable<Foo>, Task<List<Foo>>> readValues = async (values) => {
        List<Foo> foos = new List<Foo>();
        await foreach (Foo foo1 in values)
        {
            foos.Add(foo1);
        }        
        return foos;
};

and call it like this:

Task<List<Foo>> task1 = readValues(Source1());
Task<List<Foo>> task2 = readValues(Source2());

await Task.WhenAll(task1, task2);

Whole code would be:

public class FoosAsync
{
    public async IAsyncEnumerable<Foo> Source1() { }

    public async IAsyncEnumerable<Foo> Source2() { }

    public async Task<List<Foo>> SequentialSourcesQuery()
    {
        var asyncEnumerator = Source1().GetAsyncEnumerator();
        Func<IAsyncEnumerable<Foo>, Task<List<Foo>>> readValues = async (values) => {
            List<Foo> foos2 = new List<Foo>();
            await foreach (Foo foo in values)
            {
                foos2.Add(foo);
            }        
            return foos2;
        };
        
        Task<List<Foo>> task1 = readValues(Source1());
        Task<List<Foo>> task2 = readValues(Source2());
        
        await Task.WhenAll(task1, task2);
        
        List<Foo> foos = new List<Foo>(task1.Result.Count + task2.Result.Count);
        foos.AddRange(task1.Result);
        foos.AddRange(task2.Result);

        return foos;
    }
}
niki.kante
  • 77
  • 3
  • It could *possibly* be better as a local method, depending on if there's other call sites – pinkfloydx33 Feb 11 '21 at 11:09
  • Thanks, it worked perfectly. Actually, as stated by smoksnes in comments, a List is not thread safe, it doesn't matter for my use case because I use two different lists that I didn't represent in my example. But, I think you could replace the reference List by a ConcurrentBag and return a ConccurentBag.ToList(). So people won't be fooled :-) – XavierAM Feb 11 '21 at 12:46
  • For better performance you could configure the `foreach` loop to not capture and restore the `SynchronizationContext` on each internal `await`, like this: `await foreach (Foo foo1 in values.ConfigureAwait(false))`. – Theodor Zoulias Feb 11 '21 at 18:16
  • This code is broken in that you're manipulating a list from multiple threads at the same time. – Servy Feb 11 '21 at 18:17
  • 2
    @XavierAM the `ConcurrentBag` is a [very specialized](https://stackoverflow.com/questions/15400133/when-to-use-blockingcollection-and-when-concurrentbag-instead-of-listt/64823123#64823123) collection. Unless you are dealing with a mixed producer-consumer scenario, you should prefer the `ConcurrentQueue` instead. – Theodor Zoulias Feb 11 '21 at 18:18
  • @Servy I agree with that. There is a warning about that in second statement. I didn't include the solution for that because there is no ConcurrentList implementation in .Net Framework and other ConcurrentBag/Queue does not produce same result as List. – niki.kante Feb 11 '21 at 19:21
  • @niki.kante You including in your own answer the statement that your own answer doesn't work doesn't make it any less of a bad answer. That you know it doesn't work and suggested it anyway is, frankly, *more* worrying. – Servy Feb 11 '21 at 19:31
  • 2
    @Servy Thanks for criticism. Solution was meant for OP's base problem. I'll try to make more complete answer next time. I updated the answer to fix concurrency and add local method as suggested by pinkfloydx33. – niki.kante Feb 11 '21 at 20:25
  • 1
    @TheodorZoulias ok thanks, I was not aware of the subtle differences between ConcurrentBag and ConcurrentQueue. – XavierAM Feb 12 '21 at 08:14
  • 1
    @XavierAM now you are a member of the close circle of people that knows this little mystery. – Theodor Zoulias Feb 12 '21 at 08:21
  • 1
    Why the extraneous `asyncEnumerator` that doesn't appear to be used? – Ian Kemp Feb 19 '21 at 12:22