14

In C#, I am interested in stopping a Parallel.ForEachAsync loop (considering the differences between Stop and Break); for Parallel.ForEach I can do the following:

Parallel.ForEach(items, (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        state.Stop();
        return;
    }

    // some process on the item
    Process(item);
});

However, since I have a process that needs to be executed asynchronously, I switched to Parallel.ForEachAsync. The ForEachAsync does not have the Stop() method, I'm able to break the loop as the following, but I'm wondering if this is the most effective way of breaking the loop (in other words, the loop needs to stop ASAP when it receives the cancellation request).

await Parallel.ForEachAsync(items, async (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        return;
    }

    // some async process on the item
    await ProcessAsync(item);
});
Dr. Strangelove
  • 2,725
  • 3
  • 34
  • 61
  • This [question](https://stackoverflow.com/questions/71072305/how-to-break-the-parallel-foreachasync-loop-not-cancel-it) is relevant as well. It uses a flag instead of cancellationToken – Martin Wickman May 19 '22 at 08:49

2 Answers2

15

The Parallel.ForEachAsync body delegate has a CancellationToken as its second parameter. This token is supplied by the API, it's not the same token that you have passed in the ParallelOptions. You can forward this token to any asynchronous method that you invoke inside the lambda. If you invoke non-cancelable methods, then the best you can do is to call the ThrowIfCancellationRequested at strategic places inside the lambda:

CancellationTokenSource cts = new();
ParallelOptions options = new() { CancellationToken = cts.Token };

try
{
    await Parallel.ForEachAsync(items, options, async (item, ct) =>
    {
        //...
        ct.ThrowIfCancellationRequested();
        //...
        await ProcessAsync(item, ct);
        //...
        ct.ThrowIfCancellationRequested();
        //...
    });
}
catch (OperationCanceledException ex)
{
    // ...
}

The token provided as argument in the lambda, the ct in the above example, is canceled not only when the ParallelOptions.CancellationToken is canceled, but also in case a ProcessAsync operation has failed. This mechanism allows faster propagation of exceptions. The parallel loop does not complete immediately when an error occurs, because it follows the principle of disallowing fire-and-forget operations. All operations that are started internally by the loop, must be completed before the whole loop completes either successfully or with failure. The token in the lambda makes it possible to reduce this latency to a minimum.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • What's the difference between `ParallelOptions.CancellationToken` and the one you get in your callback (`ct` in your example)? If the one in options gets cancelled, will the one in your callback be cancelled as well? – Martin Wickman May 10 '22 at 12:33
  • 1
    Here is a relevant question about this: [The need for two cancellation tokens in .NET 6 Parallel.ForEachAsync?](https://stackoverflow.com/questions/70191295/the-need-for-two-cancellation-tokens-in-net-6-parallel-foreachasync) – Theodor Zoulias Mar 08 '23 at 21:57
-3

You're going to need something like this:

await Parallel.ForEachAsync(items, async (item, state) =>
{
    await ProcessAsync(item, cancellationToken);
});


async Task ProcessAsync(string item, CancellationToken ct)
{
    while (!ct.IsCancellationRequested)
    {
        //Process
    }
}
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • 1
    The drawback of this approach is how fast it breaks the loop. For instance, if there are `1e9` items and the cancellation is requested when it has just started the first item, this approach will iterate through all the remaining `1e9 - 1` items before it returns. – Dr. Strangelove Jan 24 '22 at 00:27