3

The thread Nesting await in Parallel.ForEach has an answer suggested to use Task.WhenAll to run multiple (MaxDegreeOfParallelism) asynchronous tasks in parallel, not waiting until previous task is completed.

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });
})); 
}

And call it like

ids.ForEachAsync(10,  async id =>
{
    ICustomerRepo repo = new CustomerRepo();  
    var cust = await repo.GetCustomer(id);  
    customers.Add(cust);  
});

If body has a parameter, I want to know parameter value when handling exceptions.e.g. If task body failed for id , I need to log exception, specifying, that it happened for particular id.

I’ve looked at Accessing values in Task.ContinueWith, but wasn’t able to access parameters when t.IsFaulted.

Finally I’ve added try/catch inside lambda body and it seems to work

ids.ForEachAsync(10,  async id =>
{
    try
    {
        ICustomerRepo repo = new CustomerRepo();
        var cust = await repo.GetCustomer(id);
        customers.Add(cust);
    }
    catch(Exception e)
    {
        _logger.LogError(e,” id=“+ id);
    }
});

However I am not sure, does it work correctly(i.e asynchronously, without blocking).

Later the author of the original answer suggested to use var current = partition.Current before await body and then use current in the continuation (ContinueWith(t => { ... }). –

Can anyone confirm, which approach is better? Any disadvantages of each of approaches?

Ohad Schneider
  • 36,600
  • 15
  • 168
  • 198
Michael Freidgeim
  • 26,542
  • 16
  • 152
  • 170

1 Answers1

3

Wrapping await in try/catch is fine, see: Catch an exception thrown by an async method. No big difference from my suggestion (capturing partition.Current and injecting into the ContinueWith continuation), except maybe it's a bit more efficient since no capturing is involved. Also it's a bit more readable and elegant I think, ContinueWith is kind of the "old" way of doing things (pre async/await).

Note that your example passes the burden of exception handling to the caller (who in this case calls _logger.LogError). You need to make sure that's what you want, as opposed to a catch-all embedded in the ForEachAsync code itself to handle the case where he caller does let an exception slip through. Something like:

while (partition.MoveNext()) 
{
    try
    {
        await body(partition.Current)
    }
    catch (Exception e)
    {
        // of course here you don't know the type of T (partition.Current)
        // or anything else about the operation for that matter
        LogError("error processing: " + partition.Current + ": " + e); 
    }
}
Ohad Schneider
  • 36,600
  • 15
  • 168
  • 198