Suppose a have a variable that is an IEnumerable<IAsyncEnumerable<T>>
.
How to convert to an IAsyncEnumerable<T>
??
I tried some link methods, but could not do it!
Suppose a have a variable that is an IEnumerable<IAsyncEnumerable<T>>
.
How to convert to an IAsyncEnumerable<T>
??
I tried some link methods, but could not do it!
I think I get it.
async IAsyncEnumerable<T> SelectManyAsync(IEnumerable<IAsyncEnumerable<T>> @this)
{
foreach (var entry in @this)
await foreach (value in entry)
yield return value;
}
It's a variant of SelectMany()
that unwraps the inner Async.
With cancellation:
async IAsyncEnumerable<T> SelectManyAsync(IEnumerable<IAsyncEnumerable<T>> @this, [EnumeratorCancellation] CancellationToken token = default)
{
foreach (var entry in @this)
await foreach (value in entry.WithCancellation(token).ConfigureAwait(false))
yield return value;
}
A curious language feature how enum cancellation works. I never actually needed it.
The other answer, while simple, does have the big downside that each enumerable is enumerated sequentially. Each must be fully enumerated before enumerating the next.
A more efficient method (from a time perspective) than the existing answer in many cases is to interleave all the enumerables with whichever returns first. in other words, await
all at the same time and just pass whatever result you get.
This will obviously return results unordered, but that may not be an issue.
public static async IAsyncEnumerable<T> SelectInterleavedAsync<T>(this IEnumerable<IAsyncEnumerable<T>> sources, [EnumeratorCancellation] CancellationToken token = default)
{
var enumers = new List<IAsyncEnumerator<T>>(sources is ICollection<T> coll ? coll.Count : 0);
// this contains the list of all enumerators in order received, presized if poss
try
{
foreach (var source in sources)
enumers.Add(source.GetAsyncEnumerator(token));
if (enumers.Count == 0)
yield break; // early bailout
var moveNexts = new List<Task<bool>>(enumers.Count);
// this will contain the tasks for MoveNextAsync calls
foreach (var enumer in enumers)
moveNexts.Add(enumer.MoveNextAsync().AsTask());
do
{
// get any completed task
var completed = await Task.WhenAny(moveNexts).ConfigureAwait(false);
// which index is it?
var index = moveNexts.IndexOf(completed);
if (await completed) // throws if the task is exception'ed
{
yield return enumers[index].Current;
moveNexts[index] = enumers[index].MoveNextAsync().AsTask();
}
else // this enumerator is finished, let's remove
{
moveNexts.RemoveAt(index);
await enumers[index].DisposeAsync().ConfigureAwait(false);
enumers.RemoveAt(index);
}
} while (enumers.Count > 0);
}
finally // must dispose any remaining enumerators if excepted
{
foreach (var enumer in enumers)
await enumer.DisposeAsync().ConfigureAwait(false);
}
}
The downside of this approach is that Task.WhenAny
causes quite a bit of heap allocation. You may be able to write a version using only ValueTask
, however that will be even more complex.