5

I have a Parallel.ForEach loop which loops through a collection. Inside, the loop I make multiple network I/O calls. I used Task.ContinueWith and nested the subsequent async-await calls. The order of the processing doesn't matter, but the data from each async calls should be processed in a synchronized way. Meaning- For each iteration, the data retrieved from the first async call should get passed to the second async call. After the second async call finishes, the data from both the async call should be processed together.

Parallel.ForEach(someCollection, parallelOptions, async (item, state) =>
{
    Task<Country> countryTask = Task.Run(() => GetCountry(item.ID));

    //this is my first async call
    await countryTask.ContinueWith((countryData) =>
    {
        countries.Add(countryData.Result);

        Task<State> stateTask = Task.Run(() => GetState(countryData.Result.CountryID));

        //based on the data I receive in 'stateTask', I make another async call
        stateTask.ContinueWith((stateData) =>
        {
            states.Add(stateData.Result);

            // use data from both the async calls pass it to below function for some calculation
            // in a synchronized way (for a country, its corresponding state should be passed)

            myCollection.ConcurrentAddRange(SomeCalculation(countryData.Result, stateData.Result));
        });
    });
});

I tried the above without using continue await but it was not working in synchronized way. Now, the above code executes to completion but no records gets processed.

Any help with this please? Let me know if I can add more details.

Souvik Ghosh
  • 4,456
  • 13
  • 56
  • 78
  • 7
    it seems *very* odd to mix `await` and `ContinueWith` in the same code, and: odd to use `Task.Run` if you're already on the TP... taking a look at refactoring it, but... this is more than just a little odd! – Marc Gravell Jun 17 '20 at 10:44
  • @MarcGravell: You may be right. My intention is to parallely process the data where multiple netowork I/O calls happen, but in synchronized way. – Souvik Ghosh Jun 17 '20 at 10:51
  • 4
    as a general rule, you shouldn't use `ContinueWith` ... pretty much ever, any more – Marc Gravell Jun 17 '20 at 10:54
  • If you can use the DataFlow library, you have [TransformBlock](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.transformblock-2) (and friends). Or something like this: [Queue of async tasks with throttling which supports muti-threading](https://stackoverflow.com/a/34316994/7444103), adapted to the current use case. – Jimi Jun 17 '20 at 12:03
  • 3
    _[**Don't mix `Parallel.ForEach` with `async/await`!**](https://stackoverflow.com/a/23730136/585968)_ –  Jun 17 '20 at 12:15
  • 3
    The `Parallel.ForEach` [is not async-friendly](https://stackoverflow.com/questions/15136542/parallel-foreach-with-asynchronous-lambda). The lambda passed is [async void](https://learn.microsoft.com/en-us/archive/msdn-magazine/2013/march/async-await-best-practices-in-asynchronous-programming#avoid-async-void). The method will complete immediately after starting the asynchronous operations, the `MaxDegreeOfParallelism` option will not be respected ... lots of nasty problems. Use this method for CPU-bound work only. – Theodor Zoulias Jun 17 '20 at 13:31

2 Answers2

7

As your methods involve I/O, they should be written to be truly asynchronous, not just synchronously ran on the threadpool using Task.Run.

Then you could use Task.WhenAll in combination with Enumerable.Select:

var tasks = someCollection.Select(async item =>
{
    var country = await GetCountryAsync(item.Id);
    var state = await GetStateAsync(country.CountryID);
    var calculation = SomeCalculation(country, state);

    return (country, state, calculation);
});

foreach (var tuple in await Task.WhenAll(tasks))
{
    countries.Add(tuple.country);
    states.Add(tuple.state);
    myCollection.AddRange(tuple.calculation);
}

This would ensure that each country > state > calculation occurs sequentially, but each item is processed concurrently, and asynchronously.


Update as per comment

using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();

int failures = 0;

var tasks = someCollection.Select(async item =>
{
    await semaphore.WaitAsync(cts.Token);
    
    try
    {
        var country = await GetCountryAsync(item.Id);
        var state = await GetStateAsync(country.CountryID);
        var calculation = SomeCalculation(country, state);

        Interlocked.Exchange(ref failures, 0);

        return (country, state, calculation);
    {
    catch
    {
        if (Interlocked.Increment(ref failures) >= 10)
        {
            cts.Cancel();
        }
        throw;
    }
    finally
    {
        semaphore.Release();
    }
});

The semaphore ensures a maximum of 2 concurrent async operations, and the cancellation token will cancel all outstanding tasks after 10 consecutive exceptions.

The Interlocked methods ensures that failures is accessed in a thread-safe manner.


Further Update

It may be even more efficient to use 2 semaphores to prevent multiple iterations.

Encapsulate all the list-adding into a single method:

void AddToLists(Country country, State state, Calculation calculation)
{
    countries.Add(country);
    states.Add(state);
    myCollection.AddRange(calculation);
}

Then you could allow 2 threads to simultaneously serve the Http requests, and 1 to perform the adds, making that operation thread-safe:

using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();

int failures = 0;

await Task.WhenAll(someCollection.Select(async item =>
{
    await httpSemaphore.WaitAsync(cts.Token);
    
    try
    {
        var country = await GetCountryAsync(item.Id);
        var state = await GetStateAsync(country.CountryID);
        var calculation = SomeCalculation(country, state);

        await listAddSemaphore.WaitAsync(cts.Token);
        AddToLists(country, state, calculation);

        Interlocked.Exchange(ref failures, 0);
    {
    catch
    {
        if (Interlocked.Increment(ref failures) >= 10)
        {
            cts.Cancel();
        }
        throw;
    }
    finally
    {
        httpSemaphore.Release();
        listAddSemaphore.Release();
    }
}));
Johnathan Barclay
  • 18,599
  • 1
  • 22
  • 35
  • How do you handle max degree of parallelism and exception? I need keep the max requests/second to 2 and need to break from the loop if there are 10 consecutive failures.. – Souvik Ghosh Jun 17 '20 at 13:28
  • @SouvikGhosh take a look at this: [How to limit the amount of concurrent async I/O operations?](https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations). – Theodor Zoulias Jun 17 '20 at 13:37
5

I think you're over-complicating this; inside the Parallel.ForEach, you're already on the thread pool, so there is really no benefit creating lots of additional tasks inside. So; how to do this really depends on whether GetState etc are synchronous or asynchronous. If we assume synchronous, then something like:

Parallel.ForEach(someCollection, parallelOptions, (item, _) =>
{
    var country = GetCountry(item.Id);

    countries.Add(country); // warning: may need to synchronize

    var state = GetState(country.CountryID);

    states.Add(state); // warning: may need to synchronize

    // use data from both the async calls pass it to below function for some calculation
    // in a synchronized way (for a country, its corresponding state should be passed)
    myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});

If they are async, it gets more awkward; it would be nice if we could do something like:

// WARNING: DANGEROUS CODE - DO NOT COPY
Parallel.ForEach(someCollection, parallelOptions, async (item, _) =>
{
    var country = await GetCountryAsync(item.Id);

    countries.Add(country); // warning: may need to synchronize

    var state = await GetStateAsync(country.CountryID);

    states.Add(state); // warning: may need to synchronize

    // use data from both the async calls pass it to below function for some calculation
    // in a synchronized way (for a country, its corresponding state should be passed)
    myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});

but the problem here is that none of the callbacks in Parallel.ForEach are "awaitable", meaning: we have silently created an async void callback here, which is : very bad. This means that Parallel.ForEach will think it has "finished" as soon as the non-complete await happens, which means:

  1. we have no clue when all the work has actually finished
  2. you could be doing a lot more concurrently than you intended (max-dop can not be respected)

There doesn't seem to be any good API to avoid this currently.

Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • The async approach as you explained above was my first attempt where the Parallel.ForEach didn't wait for the calls the finish. Then I did something like as explained here: https://stackoverflow.com/a/26993772/5014099. The problem was with the synchronization between the two async calls, where output of first gets passed to second call and after completion of both, the return value from both should be in sync. – Souvik Ghosh Jun 17 '20 at 11:30
  • 2
    @SouvikGhosh oof, indeed - `Parallel.ForEach` has "action" bodies (no `Func` bodies), so : if we use the `async` version, it is going to be using `async void`, which is *really bad*; are `GetCountry` and `GetState` awaitable methods? – Marc Gravell Jun 17 '20 at 11:36
  • Yes they are awaitable – Souvik Ghosh Jun 17 '20 at 11:37
  • @SouvikGhosh I've edited to discourage/explain this - it was bad guidance by me, sorry; I don't think there are any great options here - you could roll your own "max-dop aware parallel async" thing, but... – Marc Gravell Jun 17 '20 at 11:41
  • @SouvikGhosh to be explicit: adding more `Task` (in the question) also doesn't fix the problem – Marc Gravell Jun 17 '20 at 11:42
  • Thanks, I will check if I can get something around this.. – Souvik Ghosh Jun 17 '20 at 11:52
  • What about, Use a normal for loop, put the body of the loop in a async method, collect all the tasks in an ArrayList, use a Wait All on the tasks, or wait for them in a parallelFor – Ian Ringrose Jun 17 '20 at 13:02
  • @IanRingrose that won't give you max-dop – Marc Gravell Jun 17 '20 at 13:24
  • Have you considered to use `Parallel.ForAll` [instead of Parallel.Foreach](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/potential-pitfalls-with-plinq#prefer-forall-to-foreach-when-it-is-possible)? – Peter Csala Jun 18 '20 at 07:13
  • There are [3rd party libraries](https://github.com/Dasync/AsyncEnumerable#example-3-async-parallel-for-each), [2](https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/) which do support Parallel foreach in an async manner. – Peter Csala Jun 18 '20 at 07:16