283

I would like to handle a collection in parallel, but I'm having trouble implementing it and I'm therefore hoping for some help.

The trouble arises if I want to call a method marked async in C#, within the lambda of the parallel loop. For example:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

The problem occurs with the count being 0, because all the threads created are effectively just background threads and the Parallel.ForEach call doesn't wait for completion. If I remove the async keyword, the method looks like this:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

It works, but it completely disables the await cleverness and I have to do some manual exception handling.. (Removed for brevity).

How can I implement a Parallel.ForEach loop, that uses the await keyword within the lambda? Is it possible?

The prototype of the Parallel.ForEach method takes an Action<T> as parameter, but I want it to wait for my asynchronous lambda.

abatishchev
  • 98,240
  • 88
  • 296
  • 433
clausndk
  • 3,129
  • 2
  • 18
  • 14
  • 3
    I assume you meant to remove `await` from `await GetData(item)` in your second code block as it would produce a compilation error as-is. – Josh M. Nov 30 '17 at 12:18
  • 3
    Possible duplicate of [Nesting await in Parallel.ForEach](https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach) – Vitaliy Ulantikov Dec 02 '17 at 20:57
  • 1
    As a side note, the `ConcurrentBag` is a [very specialized](https://stackoverflow.com/questions/15400133/when-to-use-blockingcollection-and-when-concurrentbag-instead-of-listt/64823123#64823123) collection. A `ConcurrentQueue` would serve you better in this case. – Theodor Zoulias Jan 18 '21 at 11:16

10 Answers10

324

If you just want simple parallelism, you can do this:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

If you need something more complex, check out Stephen Toub's ForEachAsync post.

Rocklan
  • 7,888
  • 3
  • 34
  • 49
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 107
    Probably a throttling mechanism is needed. This will immediately create as many tasks as there are items which might end up in 10k network requests and such. – usr Feb 28 '13 at 13:32
  • 14
    @usr The last example in Stephen Toub's article addresses that. – svick Feb 28 '13 at 23:09
  • @svick I was puzzling over that last sample. It looks to me that it just batches a load of tasks to create more tasks to me, but they all get started en-mass. – Luke Puplett Jun 12 '17 at 16:33
  • 2
    @LukePuplett It creates `dop` tasks and each of them then processes some subset of the input collection in series. – svick Jun 12 '17 at 17:12
  • Throttling: https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations – Akira Yamamoto Jun 20 '17 at 00:59
  • @stephen I was testing your code by passing a list of numbers(1-10) and writing each number into console. All printed numbers were in sequence, however when I changed await GetData to Task.Run(async () => await GetData (x)) the printed numbers were shuffled. Any idea why it didn't run parallel at the first try? – AfshinZavvar May 09 '18 at 08:28
  • 4
    @Afshin_Zavvar: If you call `Task.Run` without `await`ing the result, then that's just throwing fire-and-forget work onto the thread pool. That is almost always a mistake. – Stephen Cleary May 10 '18 at 22:57
  • Work's perfect. I used this in sending emails from AWS lambda, speed increased 10x – Zafar May 08 '20 at 03:32
  • 4
    A simple throttling mechanism for this approach is to split your list into small lists of N entries, and perform this task select + Task.WhenAll for each smaller batch. This way you don't spawn thousands of tasks for large data sets. – Bjorn De Rijcke Aug 20 '20 at 11:35
  • This seems to be the current state of the art review: https://medium.com/@alex.puiu/parallel-foreach-async-in-c-36756f8ebe62 – satnhak Apr 26 '21 at 18:20
  • [Ohad Schneider Answer in Nesting await in Parallel.ForEach](https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach/25877042#25877042) adds error handling to Stephen Toub’s ForEachAsync extension – Michael Freidgeim Nov 06 '21 at 10:42
  • with regard to throttling you could could make a NetworkConnectionResource then use a resource pool to provide it (https://stackoverflow.com/questions/29501285/correct-way-to-implement-a-resource-pool) .. That way you start 10K Tasks but initially most would be waiting to obtain one of your (many) NetworkConnectionResource instances. – andrew pate Dec 12 '22 at 09:41
147

You can use the ParallelForEachAsync extension method from AsyncEnumerator NuGet Package:

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

Disclaimer: I'm the author of the AsyncEnumerator library, which is open source and licensed under MIT, and I'm posting this message just to help the community.

Ian Kemp
  • 28,293
  • 19
  • 112
  • 138
Serge Semenov
  • 9,232
  • 3
  • 23
  • 24
  • 3
    This is your package? I have seen you post this in a few places now? :D Oh wait.. your name is on the package :D +1 – Piotr Kula Mar 14 '17 at 14:47
  • 38
    @ppumkin, yes, it's mine. I've seen this problem over and over again, so decided to solve it in simplest way possible and free others from struggling as well :) – Serge Semenov Mar 14 '17 at 15:51
  • Thanks.. it definitely makes sense and helped me out big time! – Piotr Kula Mar 14 '17 at 16:03
  • 2
    you have a typo: `maxDegreeOfParallelism` > `maxDegreeOfParalellism` – Shiran Dror Aug 12 '17 at 13:00
  • @ShiranDror, I'm pretty sure that it's spelled correctly – Serge Semenov Aug 21 '17 at 15:51
  • 3
    The correct spelling is indeed maxDegreeOfParallelism, however there's something in @ShiranDror's comment - in your package you called the variable maxDegreeOfParalellism by mistake (and therefore your quoted code won't compile until you change it..) – BornToCode Sep 18 '17 at 12:57
  • I've re edited the code above so it actually compiles, even if the spelling is wrong from an English point of view. – StuartQ Oct 26 '19 at 10:08
  • @StuartQ, you probably use an older version of the library. The typo in the spelling has been fixed so I changed the example back. – Serge Semenov Oct 26 '19 at 21:15
  • 1
    @SergeSemenov In that case I think you might want to update the link in this answer, since it points at V1.10. Since you're active on this question, I will leave that to you. – StuartQ Oct 27 '19 at 10:16
  • Hello. Is there a way to simulate "break" ? Use case : There is a 30 minut limit on eecution. After 25 minutes I would like to stop execution and requeue remaining items for a new task. – Jurion Dec 01 '21 at 19:21
  • @SergeSemenov, I tried this approach with no success, see: https://stackoverflow.com/questions/73323842/asyncenumerator-library-not-processing-collection – Christian Phillips Aug 12 '22 at 10:45
126

One of the new .NET 6 APIs is Parallel.ForEachAsync, a way to schedule asynchronous work that allows you to control the degree of parallelism:

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

Another example in Scott Hanselman's blog.

The source, for reference.

Ian Kemp
  • 28,293
  • 19
  • 112
  • 138
Majid Shahabfar
  • 4,010
  • 2
  • 28
  • 36
  • For handling exceptions see https://stackoverflow.com/questions/40149119/how-to-handle-exceptions-in-parallel-foreach and MS docs https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-handle-exceptions-in-parallel-loops – Michael Freidgeim Jun 17 '22 at 17:28
37

With SemaphoreSlim you can achieve parallelism control.

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  await throttler.WaitAsync();
  try
  {
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;
Ganso Doido
  • 587
  • 1
  • 6
  • 11
  • 2
    SemaphoreSlim should be wrapped with a ```using``` statement because it implements IDisposable – Sal Aug 14 '20 at 20:27
16

Simplest possible extension method compiled from other answers and the article referenced by the accepted asnwer:

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync();
        try
        {
            await asyncAction(item).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

UPDATE: here's a simple modification that also supports a cancellation token like requested in the comments (untested)

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync(cancellationToken);
        if (cancellationToken.IsCancellationRequested) return;

        try
        {
            await asyncAction(item, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}
Alex from Jitbit
  • 53,710
  • 19
  • 160
  • 149
  • I would upvote it, but I don't like the arbitrary `int maxDegreeOfParallelism = 10`. Better let the user specify the level of concurrency explicitly. You could also consider adding a `.ConfigureAwait(false)` after the `asyncAction(item)` (but not after the `throttler.WaitAsync()`). – Theodor Zoulias Mar 15 '21 at 10:10
  • 2
    @TheodorZoulias both good points, edited. Also working on an variant that supports a `cancellationToken` will post it later. – Alex from Jitbit Mar 15 '21 at 10:24
  • 1
    It is worth noting that most solutions based on `Task.WhenAll` are only suitable for a relatively small number of tasks, or for `asyncAction`s that are guaranteed not to throw. Otherwise it can be very frustrating to wait half an hour for the completion of 10,000 tasks, and just receive an exception as a result (that could have been thrown by the very first task). – Theodor Zoulias Mar 15 '21 at 11:47
  • 1
    This is NOT a robust solution for 2 reasons. First, if exception thrown it will not terminate the loop. Second, `throttler` is not disposed. – zmechanic Jan 07 '22 at 11:42
  • @zmechanic I think it's up to the developer whether or not to abort the loop on exception. – Alex from Jitbit Jan 07 '22 at 17:40
  • @Alex from Jitbit agree, but it's not stated in your answer, and LINQ behavior (as you do it) is different to `foreach` in this aspect. In LINQ, the exception will not terminate enumeration. – zmechanic Jan 11 '22 at 18:05
  • could you show how to add cancelationToken... @AlexfromJitbit I did ` foreach (var item in enumerable) { ct.ThrowIfCancellationRequested();` but feel like it could maybe also be passed to the Task.Run and when 0 maxDegreeOfParallelism it should also be factored.... could you assist in this regard – Seabizkit Jan 11 '22 at 20:52
  • @zmechanic regarding the `throttler` not being disposed, I agree that it should (by adding a `using` before the `var throttler`), but it's not a deal breaker: [Do I need to Dispose a SemaphoreSlim](https://stackoverflow.com/questions/32033416/do-i-need-to-dispose-a-semaphoreslim). – Theodor Zoulias Mar 17 '22 at 16:41
  • The `if (cancellationToken.IsCancellationRequested) return;` is not needed. Theoretically it could result in the `Task` completing successfully instead of being canceled, without doing all the work. – Theodor Zoulias Apr 24 '22 at 17:38
  • Async does not equals parallel. There is no parallelism in this code, unless I am missing something. It seems you assume `asyncAction` will somehow run on a different thread but that is not guaranteed. Javascript is a good example of asyncronous but single threaded execution. You never know what kind of scheduler the async job will run on. There is a deadlock hidden here. – Thanasis Ioannidis Oct 07 '22 at 12:49
5

My lightweight implementation of ParallelForEach async.

Features:

  1. Throttling (max degree of parallelism).
  2. Exception handling (aggregation exception will be thrown at completion).
  3. Memory efficient (no need to store the list of tasks).

public static class AsyncEx
{
    public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
    {
        var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
        var tcs = new TaskCompletionSource<object>();
        var exceptions = new ConcurrentBag<Exception>();
        bool addingCompleted = false;

        foreach (T item in source)
        {
            await semaphoreSlim.WaitAsync();
            asyncAction(item).ContinueWith(t =>
            {
                semaphoreSlim.Release();

                if (t.Exception != null)
                {
                    exceptions.Add(t.Exception);
                }

                if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                {
                    tcs.TrySetResult(null);
                }
            });
        }

        Volatile.Write(ref addingCompleted, true);
        await tcs.Task;
        if (exceptions.Count > 0)
        {
            throw new AggregateException(exceptions);
        }
    }
}

Usage example:

await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
    var data = await GetData(i);
}, maxDegreeOfParallelism: 100);
nicolas2008
  • 945
  • 9
  • 11
  • `tcs.SetResult(null)` need replace to `tcs.TrySetResult(null)` – Hocas Oct 29 '20 at 14:03
  • @Hocas, why do you think TrySetResult is needed? – nicolas2008 Oct 30 '20 at 22:14
  • I had a problem with multiple call `SetResult` when last time I used this code) [When to use SetResult() vs TrySetResult()](https://stackoverflow.com/questions/12100022/taskcompletionsource-when-to-use-setresult-versus-trysetresult-etc) – Hocas Nov 02 '20 at 16:54
  • @Hocas, that's interesting. tcs.SetResult(null) is not expected to be executed twice. – nicolas2008 Nov 05 '20 at 15:12
  • Using the `CurrentCount` property of the `SemaphoreSlim` for controlling the execution flow is not a good idea. In most cases it creates race conditions. Using the `Volatile.Read` is also shaky (another possible race condition). I wouldn't trust this solution in a production environment. – Theodor Zoulias Nov 06 '20 at 14:09
  • @Theodor Zoulias, thanks for your feedback. It would be more constructive if you provide evidence or at least links to official documentation proving your doubts. – nicolas2008 Nov 07 '20 at 16:11
  • Sure. I would like to be able to point to the documentation, or some other source of reliable information. But I can't find any. The only thing I know is that I am not able to prove the correctness of your solution by simply studying it. It may be correct, or may not be, and I am not confident that I would reach to a definite conclusion by experimentation. And I don't see the point of even trying, because I already know plenty of mechanisms that achieve the same thing correctly, reliably and efficiently, so why bother? I am sorry for not being able to provide a more satisfying answer. :-) – Theodor Zoulias Nov 07 '20 at 16:41
  • @Theodor Zoulias, I would be glad to see your solution how to achieve the same thing in easier way. Promise to remove my answer once you provide better one :-) – nicolas2008 Nov 11 '20 at 16:51
  • Nicolay my preferred solution to this problem for production code would be to use an [`ActionBlock`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1) from the TPL Dataflow library. Easy, efficient, rock solid, lots of configuration options, natively available in .NET Core, what else could I ask for? If this was not an option for some reason, there are some provably correct implementations [here](https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations/) (including two of my own). – Theodor Zoulias Nov 11 '20 at 18:09
  • @Theodor Zoulias, ActionBlock missing lazy source enumeration feature. – nicolas2008 Nov 11 '20 at 20:58
  • @Theodor Zoulias, I also checked your solutions. One with worker tasks looks theoretically correct but IMO it's overcomplicated and not very efficient due to complex task hierarchy and usage of lock. – nicolas2008 Nov 11 '20 at 21:21
  • You can set the `BoundedCapacity` of the `ActionBlock` equal to the `MaxDegreeOfParallelism`, which creates a situation known as "backpreasure" in producer-consumer systems, and is quite similar with the familiar LINQ concept of lazy evaluation. The looping code that feeds the block with `await block.SendAsync(item)` is forced to wait until an empty slot becomes available in the internal buffer of the block, so the enumeration of the source becomes lazy by necessity. – Theodor Zoulias Nov 11 '20 at 22:21
  • @Theodor Zoulias I used this solution and it works in a production system with big data. The system was released in 2021. But I had done some fixes... – Hocas Dec 08 '22 at 19:55
1

I've created an extension method for this which makes use of SemaphoreSlim and also allows to set maximum degree of parallelism

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Sample Usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Jay Shah
  • 3,553
  • 1
  • 27
  • 26
  • 'using' will not help. foreach loop will be waiting for semaphone indefinitely. Just try this simple code that reproduces the issue: await Enumerable.Range(1, 4).ForEachAsyncConcurrent(async (i) => { Console.WriteLine(i); throw new Exception("test exception"); }, maxDegreeOfParallelism: 2); – nicolas2008 Oct 18 '18 at 19:42
  • @nicolay.anykienko you are right about #2. That memory problem can be solved by adding tasksWithThrottler.RemoveAll(x => x.IsCompleted); – askids Nov 14 '18 at 10:50
  • 1
    I've tried it in my code and if I maxDegreeOfParallelism is not null the code deadlocks. Here you can see all the code to reproduce: https://stackoverflow.com/questions/58793118/uwp-perform-long-view-model-creation-and-keep-ui-responsive/58824347#58824347 – Massimo Savazzi Nov 12 '19 at 18:11
  • My concern with this approach when I looked at implementing it for my use, was the 1.7 million rows I was processing would result in each having a job in the tasksWithThrottler List, and that didn't seem ideal or really scalable. Posting the solution my teammate and I came up with using ActionBlock as a separate solution. – Caleb Holt Feb 09 '21 at 03:07
  • Please add cancellation token code... aswell plz – Seabizkit Jan 11 '22 at 20:55
  • @nicolas2008 what do you mean it will wait indefinitely, it that were true it would never exist the forloop?, of which it only enters if it has more to iterate over? so it defo gets out? which means 'using' will help as it exits the method no? – Seabizkit Jun 11 '22 at 08:32
1

In the accepted answer the ConcurrentBag is not required. Here's an implementation without it:

var tasks = myCollection.Select(GetData).ToList();
await Task.WhenAll(tasks);
var results = tasks.Select(t => t.Result);

Any of the "// some pre stuff" and "// some post stuff" can go into the GetData implementation (or another method that calls GetData)

Aside from being shorter, there's no use of an "async void" lambda, which is an anti pattern.

Tom
  • 964
  • 9
  • 25
0

The following is set to work with IAsyncEnumerable but can be modified to use IEnumerable by just changing the type and removing the "await" on the foreach. It's far more appropriate for large sets of data than creating countless parallel tasks and then awaiting them all.

    public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
    {
        ActionBlock<T> block = new ActionBlock<T>(
           action, 
           new ExecutionDataflowBlockOptions 
           { 
             MaxDegreeOfParallelism = maxDegreeOfParallelism, 
             BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3 
           });

        await foreach (T item in enumerable)
        {
           await block.SendAsync(item).ConfigureAwait(false);
        }

        block.Complete();
        await block.Completion;
    }
Alexei Levenkov
  • 98,904
  • 14
  • 127
  • 179
Caleb Holt
  • 176
  • 1
  • 5
  • You should probably replace the `semaphore.Wait()` with `await semaphore.WaitAsync()`, to avoid blocking the caller. Also be aware that the functionality of the `SemaphoreSlim` in your solution can be replaced by the `BoundedCapacity` configuration of the `ActionBlock`, in combination with the awaitable `SendAsync` method. Comparatively it is more efficient (memory-wise). – Theodor Zoulias Feb 09 '21 at 04:05
  • @TheodorZoulias Thanks so much for the feedback! It's something I'm actively working on for a project so I'll look at those changes and update my solution. – Caleb Holt Feb 09 '21 at 16:35
  • 1
    https://stackoverflow.com/a/65251949/477420 answer by @TheodorZoulias shows very similar approach... presumably [SendAsync](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblock.sendasync?view=net-5.0) does not wait for operation to finish (which is not clear to me from the docs) – Alexei Levenkov Feb 09 '21 at 18:13
  • Caleb Holt another gotcha that you may want to be aware of is that enumerating the user-supplied `enumerable` could potentially result to an exception, and in this case your implementation will just propagate immediately this exception, without awaiting the completion of the `ActionBlock`. This is not an optimal behavior, because it may leave tasks running in the background unobserved (in fire-and-forget fashion). Implementing correctly a `ForEachAsync` method can be quite tricky. I became aware of the gotcha myself very recently. – Theodor Zoulias Feb 10 '21 at 00:01
  • 1
    @AlexeiLevenkov the documentation of the `SendAsync` method is quite confusing. I doubt that a person smart enough has ever existed on this planet, that could understand what this method does by just reading the docs. One should delve deep into the source code and understand that both `Post` and `SendAsync` methods are based on the hidden (explicitly implemented) [`OfferMessage`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.itargetblock-1.offermessage) API, that has 5 possible return values. The `SendAsync` handles asynchronously the `Postponed ` return value. – Theodor Zoulias Feb 10 '21 at 00:08
  • @TheodorZoulias So, try around the loop and put complete/completion calls in a finally, then allow the exception to indicate it wasn't run to completion? I considered catching ALL exceptions and returning in aggregate, but depending on the cause that might result in an enormous number of exceptions. I suppose I could go down a rabbit hole with some options class and the let the caller decide if exceptions should be aggregated or terminated on first occurrence. That's the most flexible for a generic approach. (I've also wired through cancelationToken at this point.) – Caleb Holt Feb 16 '21 at 00:39
  • Caleb ideally you should watch out for exceptions coming from the `GetAsyncEnumerator`, `MoveNextAsync` and `DisposeAsync` methods, and if anyone of these fails, propagate that exception to the `ActionBlock` by invoking its `Fault` method. That's a lot of work to do though, so you could take a shortcut and just wrap the loop in a try/catch, risking that a bug in the method's implementation would also be surfaced as a normal operational error. That's what I've done in [this](https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach/65251949#65251949) implementation. – Theodor Zoulias Feb 16 '21 at 01:30
-2

For a more simple solution (not sure if the most optimal), you can simply nest Parallel.ForEach inside a Task - as such

var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
Task.Run(() =>
{
    Parallel.ForEach(myCollection, options, item =>
    {
        DoWork(item);
    }
}

The ParallelOptions will do the throttlering for you, out of the box.

I am using it in a real world scenario to run a very long operations in the background. These operations are called via HTTP and it was designed not to block the HTTP call while the long operation is running.

  1. Calling HTTP for long background operation.
  2. Operation starts at the background.
  3. User gets status ID which can be used to check the status using another HTTP call.
  4. The background operation update its status.

That way, the CI/CD call does not timeout because of long HTTP operation, rather it loops the status every x seconds without blocking the process

Gravity API
  • 680
  • 8
  • 16
  • *"You can use async lambda as well"* <== could you explain this more? Or even better could you give an example of using an async lambda with a `Parallel.ForEach` loop, that does not result to buggy behavior? – Theodor Zoulias Feb 22 '21 at 19:53
  • 2
    Gravity I am sorry that I have to downvote your answer, but passing an async delegate to the `Parallel.ForEach` method is more than "not a best practice". It is deeply and irrecoverably flawed. The `Parallel.ForEach` does not understand async delegates, so the lambda is `async void`. It's not fire-and-forget, it's fire-and-crash. In such a case the `Parallel.ForEach` will not wait for the completion of the launched operations, it will not enforce a maximum degree of parallelism, and it will not propagate exceptions. Any exception will be unhandled and will crash the process. – Theodor Zoulias Feb 23 '21 at 18:01
  • If you follow the example here, yes, but not if you implement it in a way to cover all the – Gravity API Feb 23 '21 at 18:22
  • I don't think down vote is justified in this case. This is only an answer to a specific question, NOT the recommended answer (the recommended answer is the one I have showed in the first example). Using async inside Parallel.ForEach can work and will not crash if handled, but again IT IS NOT the recommended approach and it is not related the original answer, which works flawlessly. – Gravity API Feb 23 '21 at 18:28
  • Please run the following code, var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }; var items = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; Task.Run(() => Parallel.ForEach(items, options, async item => await Task.Run(() => Console.WriteLine($"{item}")))); Thread.Sleep(15000); it is an async Parallel.ForEach, that works. It does not crash - but, naturally enforce the max-parallel in a different way - which also can be handled, but you get the point :) – Gravity API Feb 23 '21 at 18:39
  • 1
    Gravity this is a bad example. Parallelizing the `Console.WriteLine` method makes no sense, because this method is synchronized. Only one thread can write to the `Console` at a time. Also notice the ugliness of the `Thread.Sleep(15000);`. You added this line because otherwise the program would end before the completion of the `async void` operations launched uncontrollably by the misused `Parallel.ForEach` loop. This is not the correct way to write software. – Theodor Zoulias Feb 23 '21 at 18:46
  • This is just an example - THIS IS NOT THE ANSWER for the original question, please read my original answer about it which says if you use async parallel you MUST have process to hold it - and that I DO NOT recommend it - THIS IS AN EXAMPLE of bad approach :). You can parallel inside what ever you want - console write line it is just and example. Please read the first answer - BEFORE the update - THIS IS my answer and this solve the parallel issue asked in the first place. – Gravity API Feb 23 '21 at 19:37
  • I have commented out the bad practice example to make the answer more clear – Gravity API Feb 23 '21 at 19:44
  • 2
    Well, you can't expect good votes by presenting bad examples and indirectly promoting bad practices, whether you recommend them or not. How about removing all the bad stuff from your answer, and keeping the good stuff? – Theodor Zoulias Feb 23 '21 at 19:46
  • Agreed :) That is why I removed it – Gravity API Feb 23 '21 at 19:48
  • 2
    Gravity the phrase *"You can use async lambda as well"*, connected with the `Parallel.ForEach` method, is an indisputable downvote by me. No amount of warnings before or after, or deletion indications like strikethrough, can make the presence of this phrase tolerable. I am talking exclusively about my own voting criteria. Anyone else can vote however they see fit. – Theodor Zoulias Feb 23 '21 at 19:58
  • 1
    Accepted and make sense. Didn't think about it that way and I agree with your criteria and change my post accordingly. – Gravity API Feb 23 '21 at 20:45