0

The first function is designed to enable linq to execute lambda functions safely in parallel (even the async void ones).

So you can do collection.AsParallel().ForAllASync(async x => await x.Action).

The second function is designed to enable you to combine and execute multiple IAsyncEnumerables in parallel and return their results as quick as possible.

I have the following code:

    public static async Task ForAllAsync<TSource>(
        this ParallelQuery<TSource> source, 
        Func<TSource, Task> selector,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? Math.Min(System.Environment.ProcessorCount, 128);
        using SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = source.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            
            try
            {
                await selector(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        await Task.WhenAll(tasks).ConfigureAwait(true);
    }

    public static async IAsyncEnumerable<T> ForAllAsync<TSource, T>(
        this ParallelQuery<TSource> source,
        Func<TSource, IAsyncEnumerable<T>> selector,
        int? maxDegreeOfParallelism = null,
        [EnumeratorCancellation]CancellationToken cancellationToken = default) 
        where T : new()
    {
        IEnumerable<(IAsyncEnumerator<T>, bool)> enumerators = 
            source.Select(x => (selector.Invoke(x).GetAsyncEnumerator(cancellationToken), true)).ToList();

        while (enumerators.Any())
        {
            await enumerators.AsParallel()
                .ForAllAsync(async e => e.Item2 = (await e.Item1.MoveNextAsync()), maxDegreeOfParallelism)
                .ConfigureAwait(false);
            foreach (var enumerator in enumerators)
            {
                yield return enumerator.Item1.Current;
            }
            enumerators = enumerators.Where(e => e.Item2);
        }
    }

The code somehow continues to return results after the iterators have reached the end.

I am using these functions to combine several threads of IAsyncEnumerable functions that call on API endpoints excepting results of the same type.

Why?

Eduard G
  • 443
  • 5
  • 21
  • 3
    This seems very similar to your post [an hour ago](https://stackoverflow.com/questions/67198591/iasyncenumerator-current-returns-null-when-enumerators-collection-is-not-casted). What changed? – gunr2171 Apr 21 '21 at 16:53
  • Different question. After fixing the ToList(), now the code wont stop returning results. From what I understand from the accepted answer, the rest of the code should be correct. – Eduard G Apr 21 '21 at 16:56
  • I find it weird there's no native implementations that can combine IAsyncEnumerables of the same type into one and execute them asnychronously. Or any available on the internet that I could find. – Eduard G Apr 21 '21 at 17:00
  • As a side note, why the `ForAllAsync` method operates on `ParallelQuery`s instead of `IEnumerable`s? It seems that no functionality specific to parallel sequences is used inside the method. The `source` sequence is just enumerated. In which case a more popular name for this method would be [`ForEachAsync`](https://github.com/dotnet/runtime/issues/1946 "Async parallel foreach") instead of `ForAllAsync`. – Theodor Zoulias Apr 21 '21 at 17:03
  • 1
    Regarding the second `ForAllAsync` method that returns an `IAsyncEnumerable`, you may want to check out the implementation of the [`AsyncEnumerableEx.Merge`](https://github.com/dotnet/reactive/blob/main/Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Merge.cs) operator from the [System.Interactive.Async](https://www.nuget.org/packages/System.Interactive.Async/) package. This operator has the following signature: `public static IAsyncEnumerable Merge(this IEnumerable> sources);` – Theodor Zoulias Apr 21 '21 at 17:09
  • Designed as an improvement for https://learn.microsoft.com/en-us/dotnet/api/system.linq.parallelenumerable.forall?view=net-5.0 , which does not work for async-void lambda methods (passing a async => await lambda will cause it not to wait), so I made this to execute functions more safely. – Eduard G Apr 21 '21 at 17:09
  • 1
    That looks a lot like exactly what I need. Thank you! Nevertheless I am curious as to what I got wrong. – Eduard G Apr 21 '21 at 17:14
  • 1
    The PLINQ library is by design not async-friendly, and a single `ForAllAsync` operator will not make it async-friendly. So for your asynchronous problems you'd better forget about the `AsParallel` as a solution. Your `ForAllAsync` implementation is very similar, if not identical, to `ForEachAsync` implementations ([1](https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach), [2](https://stackoverflow.com/questions/15136542/parallel-foreach-with-asynchronous-lambda)) that operate on the more prevalent `IEnumerable` interface. – Theodor Zoulias Apr 21 '21 at 17:23
  • Not entire sure why but the Merge function gave me some very slow results. – Eduard G Apr 21 '21 at 17:38
  • You can't really tell that it's slow before having compared it with an alternative working implementation. And this kind of functionality is not trivial to implement. – Theodor Zoulias Apr 21 '21 at 19:32
  • I do have an alternative working one, in which I am spinning several threads using ForAllAsync, doing an await foreach on several queries. It takes about 60 seconds, while merge takes 300+ seconds for the same results. Maybe I am not implementing the merge correctly, I will do some digging when I have the time. – Eduard G Apr 22 '21 at 07:36

1 Answers1

1

The type (IAsyncEnumerator<T>, bool) is a shorthand of the ValueTuple<IAsyncEnumerator<T>, bool> type, which is a value type. This means that on assignement it's not passed by reference, and instead it's copied. So this lambda does not work as intended:

async e => e.Item2 = (await e.Item1.MoveNextAsync())

Instead of changing the bool part of the entry stored in the list, it changes the value of a temporary copy, so the change is not preserved.

To make it work as intended you must either switch to reference type tuples (Tuple<IAsyncEnumerator<T>, bool>), or replace the whole entry in the list:

List<(IAsyncEnumerator<T>, bool)> enumerators = source./*...*/.ToList()
//...
var entry = enumerators[index];
enumerators[index] = (entry.Item1, await entry.Item1.MoveNextAsync());

Be aware that the List<T> class is not thread-safe, so in order to update it safely from multiple threads concurrently you must protect it with a lock.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Good catch. Tuples being readonly wont allow for the bool to change, so in these cases the whole entry has to be replaced. Still testing it out. – Eduard G Apr 22 '21 at 07:45
  • I instead created a class that holds both values. Works beautifully, thank you! Sadly it's not too fast because the way I've written the code the threads wait until all are finished before spinning any new ones. I tried implementing Merge again, making sure I am doing it correctly, but it was still slow. In the end, Merge takes 300s, the implementation you helped me fix takes about 120s, my alternative but dirtier implementation as noted in the comments on the initial post takes about 60s, all for the same queries and same results. – Eduard G Apr 22 '21 at 08:06
  • @EduardG the `AsyncEnumerableEx.Merge` being five times slower than a custom implementation is strange and unexpected. My guess is that the `IAsyncEnumerable`s projected by the `selector` interfere with each other, and having too many of them enumerated concurrently results to depletion of the system's resources. This is probably why you had the include the `maxDegreeOfParallelism` parameter in your custom mechanism. I would need to see the actual `selector` that you use to create the sequences, before being able to offer any specific explanation or advice. – Theodor Zoulias Apr 22 '21 at 11:02