1

I'm trying to change Stephen Toub's ForEachAsync<T> extension method into an extension which returns a result...

Stephen's extension:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}

My approach (not working; tasks get executed but result is wrong)

public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
    int degreeOfParallelism, Func<T, Task<TResult>> body)
{
    return Task.WhenAll<TResult>(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run<TResult>(async () = 
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current); // When I "return await",
                        // I get good results but only one per partition 
            return default(TResult);
        }));
}

I know I somehow have to return (WhenAll?) the results from the last part but I didn't yet figure out how to do it...

Update: The result I get is just degreeOfParallelism times null (I guess because of default(TResult)) even though all the tasks get executed. I also tried to return await body(...) and then the result was fine, but only degreeOfParallelism number of tasks got executed.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Dunken
  • 8,481
  • 7
  • 54
  • 87
  • "Result is wrong" really doesn't describe what you're seeing at all. The fact that you're returning `default(TResult)` doesn't seem like a good start. It would help if you'd provide a short but complete program demonstrating the problem, including sample input, expected output and actual output. (I strongly suspect you want `SelectMany` instead of `Select` here, basically...) – Jon Skeet Jun 18 '15 at 06:09
  • @JonSkeet: added an update – Dunken Jun 18 '15 at 06:13
  • Couldn’t you make a list where you add your results to that you then return at the very end after everything finished? – poke Jun 18 '15 at 06:15
  • @poke: thought of that too but I believe this is not really async-like?!? – Dunken Jun 18 '15 at 06:17
  • It doesn't help that the code you've posted currently wouldn't compile... – Jon Skeet Jun 18 '15 at 06:17
  • @JonSkeet: I'm sorry, somehow brackets got not copied and pasted... I should now work... – Dunken Jun 18 '15 at 06:19
  • Nope, you appear to have a parameter of type `Func` with no name. Again, if you'd posted a short but complete example to start with, you could be confident that it would be compile because it would be your exact code. – Jon Skeet Jun 18 '15 at 06:22
  • @JonSkeet: My bad, I apologize... updated it again. – Dunken Jun 18 '15 at 06:24
  • @Dunken: Are you absolutely sure you need a parallel *and* asynchronous foreach? Parallel implies CPU-heavy code, and asynchronous implies I/O-heavy code. Yours is both? If you are in this (very rare) situation, consider using TPL Dataflow. – Stephen Cleary Jun 18 '15 at 11:40
  • @StephenCleary: good question... I believe yes. Actually I want to stress-test an API (I also want to measure the throughput). I first went with tasks only but this overwhelmed my system (to many HTTP connections?). Because of this I now switched to the current solution which allows me to call my API a few thousand times with a fixed number of concurrent calls... Do you agree? Thanks for the hint. I do have to admit I don't know it very well but I think it's worth digging in... – Dunken Jun 18 '15 at 12:02
  • You can achieve throttling for asynchronous code via `SemaphoreSlim`. I just tend to avoid parallel code unless there's a need for it (i.e., CPU-heavy code, which is not the case here). – Stephen Cleary Jun 18 '15 at 18:30

2 Answers2

7

Now that the Parallel.ForEachAsync API has become part of the standard libraries (.NET 6), it makes sense to implement a variant that returns a Task<TResult[]>, based on this API. Here is an implementation:

/// <summary>
/// Executes a foreach loop on an enumerable sequence, in which iterations may run
/// in parallel, and returns the results of all iterations in the original order.
/// </summary>
public static Task<TResult[]> ForEachAsync<TSource, TResult>(
    IEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask<TResult>> body)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(parallelOptions);
    ArgumentNullException.ThrowIfNull(body);
    List<TResult> results = new();
    if (source.TryGetNonEnumeratedCount(out int count)) results.Capacity = count;
    IEnumerable<(TSource, int)> withIndexes = source.Select((x, i) => (x, i));
    return Parallel.ForEachAsync(withIndexes, parallelOptions, async (entry, ct) =>
    {
        (TSource item, int index) = entry;
        TResult result = await body(item, ct).ConfigureAwait(false);
        lock (results)
        {
            while (results.Count <= index) results.Add(default);
            results[index] = result;
        }
    }).ContinueWith(t =>
    {
        TaskCompletionSource<TResult[]> tcs = new();
        switch (t.Status)
        {
            case TaskStatus.RanToCompletion:
                lock (results) tcs.SetResult(results.ToArray()); break;
            case TaskStatus.Faulted:
                tcs.SetException(t.Exception.InnerExceptions); break;
            case TaskStatus.Canceled:
                tcs.SetCanceled(new TaskCanceledException(t).CancellationToken); break;
            default: throw new UnreachableException();
        }
        Debug.Assert(tcs.Task.IsCompleted);
        return tcs.Task;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

This implementation supports all the options and the functionality of the Parallel.ForEachAsync overload that has an IEnumerable<T> as source. Its behavior in case of errors and cancellation is identical. The results are arranged in the same order as the associated elements in the source sequence.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Nice! There should be an overload built-in with an index parameter to the delegate. The only thing I would change at first glance: I would use a `ConcurrentDictionary` instead of a `List` and I would remove the `lock`. – Asotos Apr 11 '23 at 10:59
  • 1
    @Asotos the `Parallel.ForEachAsync` overload with indices sounds like a legit idea. You could consider submitting an [API suggestion](https://github.com/dotnet/runtime/issues/new/choose) to the dotnet/runtime repository. I would give it a ~10% chance to be accepted (after a few years of deliberations). As for switching to a `ConcurrentDictionary` for storing the results, it is also a valid idea. I avoided it because it will immediately increase the memory consumption, while the performance benefits will be most likely negligible. – Theodor Zoulias Apr 11 '23 at 11:49
  • The `Parallel.ForEach` already has such an overload, so it might be added eventually. – Asotos Apr 11 '23 at 12:02
  • @Asotos it might, but don't hold your breath. The evolution of .NET APIs is a slow and hesitant process. :-) – Theodor Zoulias Apr 11 '23 at 12:10
  • @TheodorZoulias Followed a thread of SO questions, answers and comments from you to get here. In a different attempt to implement a ParallelForEachAsync you commented about CancellationToken usage here: https://stackoverflow.com/questions/15136542/parallel-foreach-with-asynchronous-lambda/68901782#comment127207976_66635760. I'm surprised to not see any 'check for is cancelled' in the code above. Could you explain how that works? – Terry Jun 18 '23 at 14:29
  • @TheodorZoulias On the same ParallelForEachAsync solution, you commented about `WhenAll` being suitable for small number or never throwing error here https://stackoverflow.com/questions/15136542/parallel-foreach-with-asynchronous-lambda/68901782#comment117797299_66635760. I have approximately <= 25 REST calls I need to make. I just want to either await all results or cancel as soon as the first one fails (cancelling all other requests). Best practice is to just use this method and set degreeOfParallellism to the size of my 'task list'? – Terry Jun 18 '23 at 14:47
  • @Terry the cancellation here is handled by the native `Parallel.ForEachAsync` itself. If you want to cancel the parallel operation, you can configure the `CancellationToken` property of the `ParallelOptions`. Regarding how to configure the `MaxDegreeOfParallelism` property, there is no one-fits-all rule, because it is highly dependand on the capabilities of the hardware, network etc that does the actual work. I have posted some suggestions about this [here](https://stackoverflow.com/questions/34359509/factors-for-determining-the-degree-of-parallelism-for-the-foreachasync/75432732#75432732). – Theodor Zoulias Jun 18 '23 at 15:11
  • @TheodorZoulias Thanks for the info. I've posted a new question https://stackoverflow.com/questions/76501926/best-way-to-handle-async-await-and-cancellation to try and explain what I want. – Terry Jun 18 '23 at 18:06
  • @TheodorZoulias Why not open a PR to dotnet/runtime for this? – John Zabroski Jul 24 '23 at 15:39
  • @JohnZabroski at least one proposal already exists: [API Proposal: Provide a result returning Parallel.ForEachAsync overload](https://github.com/dotnet/runtime/issues/72489). – Theodor Zoulias Jul 24 '23 at 15:43
3

Your LINQ query can only ever have the same number of results as the number of partitions - you're just projecting each partition into a single result.

If you don't care about the order, you just need to assemble the results of each partition into a list, then flatten them afterwards.

public static async Task<TResult[]> ExecuteInParallel<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
    var lists = await Task.WhenAll<List<TResult>>(
        Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
            .Select(partition => Task.Run<List<TResult>>(async () =>
                    {
                        var list = new List<TResult>();
                        using (partition)
                        {
                            while (partition.MoveNext())
                            {
                                list.Add(await body(partition.Current));
                            }
                        }
                        return list;
                   })));
     return lists.SelectMany(list => list).ToArray();
}

(I've renamed this from ForEachAsync, as ForEach sounds imperative (suitable for the Func<T, Task> in the original) whereas this is fetching results. A foreach loop doesn't have a result - this does.)

Jon Skeet
  • 1,421,763
  • 867
  • 9,128
  • 9,194
  • Oh right, keep the lists in the synchronous part, and return lists. That’s good! Guess I wasn’t too far off after all. – poke Jun 18 '15 at 06:28
  • @JonSkeet: doesn't compile (list.Add instead of results.Add) but it's still not working – Dunken Jun 18 '15 at 06:38
  • @Dunken: I've fixed the `list.Add`, but please give more information than "it's still not working" - that really doesn't provide much for me to go on. (Again, if you'd provided a short but complete program demonstrating the problem, it would have been easier for me to test it for myself...) – Jon Skeet Jun 18 '15 at 07:41
  • @JonSkeet: I meant it still doesn't compile... I'm working on an example... For now: The await operator in the 3rd line can only be used within an async method... – Dunken Jun 18 '15 at 08:26
  • 1
    @DunkenL: Okay, if you'd said that before I'd have fixed it immediately - the method just needs to be an async method. *Always* say what the error is, rather than just "it doesn't work". – Jon Skeet Jun 18 '15 at 08:40
  • 1
    @Dunken: Doh, typo. Should be async instead of await. Try now... will try compiling it myself, too... – Jon Skeet Jun 18 '15 at 09:34