0

Suppose a have a variable that is an IEnumerable<IAsyncEnumerable<T>>.

How to convert to an IAsyncEnumerable<T> ??

I tried some link methods, but could not do it!

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • What kind of conversion are you looking for? Do you want to subscribe to the inner `IAsyncEnumerable`s one at a time, or all at once and merge their contents? – Theodor Zoulias Mar 25 '23 at 00:59
  • 1
    If you have a collection of something you can't convert it to a single, you have to select one of them. In this case you have a collections of colecctions of any type (T) and you want to convert it to only one collection, so you have to select it by some rule, but if you don't specify the code, what is your rule? The first or default one? Select by what? – Leandro Bardelli Mar 25 '23 at 01:04
  • 1
    @LeandroBardelli I am pretty sure that the OP wants to convert the nested sequence to a flat sequence, that contains all the elements of the inner sequences. – Theodor Zoulias Mar 25 '23 at 01:11

2 Answers2

5

I think I get it.

async IAsyncEnumerable<T> SelectManyAsync(IEnumerable<IAsyncEnumerable<T>> @this)
{
    foreach (var entry in @this)
        await foreach (value in entry)
            yield return value;
}

It's a variant of SelectMany() that unwraps the inner Async.

With cancellation:

async IAsyncEnumerable<T> SelectManyAsync(IEnumerable<IAsyncEnumerable<T>> @this, [EnumeratorCancellation] CancellationToken token = default)
{
    foreach (var entry in @this)
        await foreach (value in entry.WithCancellation(token).ConfigureAwait(false))
            yield return value;
}

A curious language feature how enum cancellation works. I never actually needed it.

Joshua
  • 40,822
  • 8
  • 72
  • 132
  • I would never think in that way, good answer. There is my doubt if the op really want to mix the elements or why they are not mixed +1 – Leandro Bardelli Mar 25 '23 at 01:00
  • 1
    @LeandroBardelli: I've seen this kind of thing happen a lot in code at work where somebody needs to convert nested enumerables into enumerables; if you're getting the type error this is usually it. – Joshua Mar 25 '23 at 01:02
0

The other answer, while simple, does have the big downside that each enumerable is enumerated sequentially. Each must be fully enumerated before enumerating the next.

A more efficient method (from a time perspective) than the existing answer in many cases is to interleave all the enumerables with whichever returns first. in other words, await all at the same time and just pass whatever result you get.

This will obviously return results unordered, but that may not be an issue.

public static async IAsyncEnumerable<T> SelectInterleavedAsync<T>(this IEnumerable<IAsyncEnumerable<T>> sources, [EnumeratorCancellation] CancellationToken token = default)
{
    var enumers = new List<IAsyncEnumerator<T>>(sources is ICollection<T> coll ? coll.Count : 0);
    // this contains the list of all enumerators in order received, presized if poss

    try
    {
        foreach (var source in sources)
            enumers.Add(source.GetAsyncEnumerator(token));

        if (enumers.Count == 0)
            yield break;    // early bailout

        var moveNexts = new List<Task<bool>>(enumers.Count);
        // this will contain the tasks for MoveNextAsync calls

        foreach (var enumer in enumers)
            moveNexts.Add(enumer.MoveNextAsync().AsTask());
    
        do
        {
            // get any completed task
            var completed = await Task.WhenAny(moveNexts).ConfigureAwait(false);
            // which index is it?
            var index = moveNexts.IndexOf(completed);
            if (await completed)    // throws if the task is exception'ed
            {
                yield return enumers[index].Current;
                moveNexts[index] = enumers[index].MoveNextAsync().AsTask();
            }
            else    // this enumerator is finished, let's remove
            {
                moveNexts.RemoveAt(index);
                await enumers[index].DisposeAsync().ConfigureAwait(false);
                enumers.RemoveAt(index);
            }
        } while (enumers.Count > 0);
    }
    finally    // must dispose any remaining enumerators if excepted
    {
        foreach (var enumer in enumers)
            await enumer.DisposeAsync().ConfigureAwait(false);
    }
}

The downside of this approach is that Task.WhenAny causes quite a bit of heap allocation. You may be able to write a version using only ValueTask, however that will be even more complex.

Charlieface
  • 52,284
  • 6
  • 19
  • 43
  • I think that you have posted a similar answer [here](https://stackoverflow.com/questions/70710153/how-to-merge-multiple-asynchronous-sequences-without-left-side-bias/70711683#70711683). :-) – Theodor Zoulias Mar 26 '23 at 05:55
  • Ooof that was a while back, completely forgot. Any thoughts on either? – Charlieface Mar 26 '23 at 06:01
  • My only thought here is that the OP hasn't bothered to clarify if they want a concurrent or non-concurrent merging, so we could put our efforts on other questions instead. Better find an old question and answer it, than putting effort on a question that the OP puts zero effort to clarify! – Theodor Zoulias Mar 26 '23 at 06:09