3

recently I have seen several SO threads related to Parallel.ForEach mixed with async lambdas, but all proposed answers were some kind of workarounds.

Is there any way how could I write:

List<int> list = new List<int>[]();

Parallel.ForEach(arrayValues, async (item) =>
{
  var x = await LongRunningIoOperationAsync(item);
  list.Add(x);
});

How can I ensure that list will contain all items from all iterations executed withing lambdas in each iteration?

How will generally Parallel.ForEach work with async lambdas, if it hit await will it hand over its thread to next iteration?

I assume ParallelLoopResult IsCompleted field is not proper one, as it will return true when all iterations are executed, no matter if their actual lambda jobs are finished or not?

ldragicevic
  • 621
  • 4
  • 11
  • 22
  • await would force the task thread to be released and it doesnt gaurantee that same thread will continue when awaited task finishes. But yes the Parallel ForEach would guarantee that all iterations completed and x's were added to list unless an exception happened. – Prateek Shrivastava Feb 24 '20 at 00:55
  • 3
    You'd be better off with `var results = await Task.WhenAll(arrayvalues.Select(x => LongRunningIoOperationAsync(x)))`. Parallel is better suited for CPU bound work, not IO bound. – juharr Feb 24 '20 at 01:13
  • 1
    Your implementation is not thread-safe. You can't call `list.Add(x)` inside the `Parallel.ForEach`. – Enigmativity Feb 24 '20 at 01:16
  • 1
    @ldragicevic if you want to limit the amount of concurrent asynchronous operations look [here](https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations). – Theodor Zoulias Feb 24 '20 at 06:32

2 Answers2

7

recently I have seen several SO threads related to Parallel.ForEach mixed with async lambdas, but all proposed answers were some kind of workarounds.

Well, that's because Parallel doesn't work with async. And from a different perspective, why would you want to mix them in the first place? They do opposite things. Parallel is all about adding threads and async is all about giving up threads. If you want to do asynchronous work concurrently, then use Task.WhenAll. That's the correct tool for the job; Parallel is not.

That said, it sounds like you want to use the wrong tool, so here's how you do it...

How can I ensure that list will contain all items from all iterations executed withing lambdas in each iteration?

You'll need to have some kind of a signal that some code can block on until the processing is done, e.g., CountdownEvent or Monitor. On a side note, you'll need to protect access to the non-thread-safe List<T> as well.

How will generally Parallel.ForEach work with async lambdas, if it hit await will it hand over its thread to next iteration?

Since Parallel doesn't understand async lambdas, when the first await yields (returns) to its caller, Parallel will assume that interation of the loop is complete.

I assume ParallelLoopResult IsCompleted field is not proper one, as it will return true when all iterations are executed, no matter if their actual lambda jobs are finished or not?

Correct. As far as Parallel knows, it can only "see" the method to the first await that returns to its caller. So it doesn't know when the async lambda is complete. It also will assume iterations are complete too early, which throws partitioning off.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
4

You don't need Parallel.For/ForEach here you just need to await a list of tasks.

Background

In short you need to be very careful about async lambdas, and if you are passing them to an Action or Func<Task>

Your problem is because Parallel.For / ForEach is not suited for the async and await pattern or IO bound tasks. They are suited for cpu bound workloads. Which means they essentially have Action parameters and let's the task scheduler create the tasks for you

If you want to run multiple async tasks at the same time use Task.WhenAll , or a TPL Dataflow Block (or something similar) which can deal effectively with both CPU bound and IO bound works loads, or said more directly, they can deal with tasks which is what an async method is.

Unless you need to do more inside of your lambda (for which you haven't shown), just use aSelect and WhenAll

var tasks = items .Select(LongRunningIoOperationAsync);
var results = await Task.WhenAll(tasks); // here is your list of int

If you do, you can still use the await,

var tasks = items.Select(async (item) =>
   {
       var x = await LongRunningIoOperationAsync(item);
       // do other stuff
       return x;
   });

var results = await Task.WhenAll(tasks);

Note : If you need the extended functionality of Parallel.ForEach (namely the Options to control max concurrency), there are several approach, however RX or DataFlow might be the most succinct

TheGeneral
  • 79,002
  • 9
  • 103
  • 141
  • thanks, I am a bit curious only if c# program could handle any length of items e.g. if I pass 1000 items would it be smart to schedule some limited number of threads to execute all 1000 tasks in batches of a smaller number of threads – ldragicevic Feb 24 '20 at 01:22
  • 2
    @ldragicevic the default task scheduler will take care limiting concurrency depending on the workloads, cores and heuristics – TheGeneral Feb 24 '20 at 01:24
  • 1
    Thanks for the response. I was only worried if I would need to handle resources for parallelism, but according to previous comment, it's ok. – ldragicevic Feb 24 '20 at 01:29