227

In a metro app, I need to execute a number of WCF calls. There are a significant number of calls to be made, so I need to do them in a parallel loop. The problem is that the parallel loop exits before the WCF calls are all complete.

How would you refactor this to work as expected?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();
Paul Brinkley
  • 6,283
  • 3
  • 24
  • 33
Darthg8r
  • 12,377
  • 15
  • 63
  • 100
  • 1
    I've voted this question as a duplicate of the [Parallel foreach with asynchronous lambda](https://stackoverflow.com/questions/15136542/parallel-foreach-with-asynchronous-lambda), although that question is newer by a few months than this question, because the other question contains an already heavily upvoted [answer](https://stackoverflow.com/a/68901782/11178549) that recommends what is probably the best current solution to this problem, which is the new `Parallel.ForEachAsync` API. – Theodor Zoulias Feb 15 '22 at 12:59

11 Answers11

197

The whole idea behind Parallel.ForEach() is that you have a set of threads and each thread processes part of the collection. As you noticed, this doesn't work with async-await, where you want to release the thread for the duration of the async call.

You could “fix” that by blocking the ForEach() threads, but that defeats the whole point of async-await.

What you could do is to use TPL Dataflow instead of Parallel.ForEach(), which supports asynchronous Tasks well.

Specifically, your code could be written using a TransformBlock that transforms each id into a Customer using the async lambda. This block can be configured to execute in parallel. You would link that block to an ActionBlock that writes each Customer to the console. After you set up the block network, you can Post() each id to the TransformBlock.

In code:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

Although you probably want to limit the parallelism of the TransformBlock to some small constant. Also, you could limit the capacity of the TransformBlock and add the items to it asynchronously using SendAsync(), for example if the collection is too big.

As an added benefit when compared to your code (if it worked) is that the writing will start as soon as a single item is finished, and not wait until all of the processing is finished.

Ian Kemp
  • 28,293
  • 19
  • 112
  • 138
svick
  • 236,525
  • 50
  • 385
  • 514
  • 2
    A very brief overview of async, reactive extensions, TPL and TPL DataFlow - http://vantsuyoshi.wordpress.com/2012/01/05/when-to-use-tpl-async-reactive-extension-tpl-dataflow/ for those like myself who might need some clarity. – Norman H Sep 13 '13 at 11:04
  • 1
    I'm pretty sure this answer does NOT parallelize the processing. I believe you need to do a Parallel.ForEach over the ids and post those to the getCustomerBlock. At least that's what I found when I tested this suggestion. – JasonLind Dec 16 '15 at 22:23
  • 4
    @JasonLind It really does. Using `Parallel.ForEach()` to `Post()` items in parallel shouldn't have any real effect. – svick Dec 16 '15 at 22:26
  • 1
    @svick Ok I found it, The ActionBlock also needs to be in Parallel. I was doing it slightly differently, I didn't need a transform so I just used a bufferblock and did my work in the ActionBlock. I got confused from another answer on the interwebs. – JasonLind Dec 16 '15 at 22:35
  • 2
    By which I mean specifying MaxDegreeOfParallelism on the ActionBlock like you do on the TransformBlock in your example – JasonLind Dec 16 '15 at 22:49
  • 1
    You need to include the [`System.Threading.Tasks.Dataflow`](https://www.nuget.org/packages/System.Threading.Tasks.Dataflow) NuGet package to run this – Liam Jul 13 '18 at 08:25
  • 1
    [Parallel.ForEachAsync](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync?view=net-6.0) has been added in .NET6. – Kim Oct 18 '21 at 06:30
148

svick's answer is (as usual) excellent.

However, I find Dataflow to be more useful when you actually have large amounts of data to transfer. Or when you need an async-compatible queue.

In your case, a simpler solution is to just use the async-style parallelism:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();
Community
  • 1
  • 1
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 18
    If you wanted to manually limit parallelism (which you most likely do in this case), doing it this way would be more complicated. – svick Jul 19 '12 at 16:50
  • 2
    But you're right that Dataflow can be quite complicated (for example when compared with `Parallel.ForEach()`). But I think it's currently the best option to do almost any `async` work with collections. – svick Jul 19 '12 at 16:53
  • @svick - IMHO it's a function of whether you're trying to set up 'streams' of processing (use Dataflow) or just parallelizing the processing of a collection (Parallel.ForEach, or create collections of Tasks and WhenAll on them as Stephen does here). If you're not setting up blocks to run for an extended period of time, Dataflow feels like overkill IMHO. :) – James Manning Jul 19 '12 at 18:36
  • ParallelOptions lets you limit parallelism, FWIW, in case others run across this thread and don't already know about it – James Manning Jul 19 '12 at 23:29
  • 2
    @JamesManning how is `ParallelOptions` going to help? It's only applicable to `Parallel.For/ForEach/Invoke`, which as the OP established are of no use here. – Ohad Schneider Sep 15 '14 at 21:17
  • 1
    @StephenCleary If the `GetCustomer` method is returning a `Task`, Should one be using `Select(async i => { await repo.GetCustomer(i);});` ? – Shyju May 04 '16 at 18:02
  • 1
    @Shyju: No, you should use `Select(i => repo.GetCustomer(i))`. – Stephen Cleary May 04 '16 at 20:07
  • 1
    @StephenCleary why shouldnt we use async if it supports async? I thought you should go async all the way. does it not apply in this case? I have an old function just reviewed i was doing it with async and Its been working just fine all the time. – Emil Dec 06 '16 at 13:47
  • 9
    @batmaci: `Parallel.ForEach` doesn't support `async`. – Stephen Cleary Dec 06 '16 at 15:03
  • 1
    if you care about degree of Parallelism then you can do `ids.AsParallel().WithDegreeOfParallelism(15).Select(async i => await Task);` – MikeT Sep 04 '19 at 12:15
  • 4
    @MikeT: That will not work as expected. PLINQ doesn't understand asynchronous tasks, so that code will parallelize only the *starting* of the `async` lambda. – Stephen Cleary Sep 04 '19 at 17:24
  • 1
    @StephenCleary Given "`Parallel.ForEach` doesn't support `async`", I am curious to understand how it ended up in the `System.Threading.Tasks` namespace instead of simply being in `System.Threading`. It seams like that would have cut back on some of the confusion. – Mike Mar 06 '20 at 10:02
  • 3
    @Mike: `Parallel` (and `Task`) were written years before `async`/`await`, as part of the Task Parallel Library (TPL). When `async`/`await` came on the scene, they had the option of making their own `Future` type for use with `async` or re-using the existing `Task` type from the TPL. Neither decision was obviously correct, so they decided to re-use `Task`. – Stephen Cleary Mar 06 '20 at 13:34
  • @StephenCleary How would you handle exception and cancel the execution in case of consequitive exception? In Parallel For we use state.Break(); – Souvik Ghosh Jun 15 '20 at 07:08
  • @SouvikGhosh: If you want to tear down a dataflow block, you can let the exception propagate. Exceptions from action method will tear down that dataflow block. If you want to tear down an entire mesh, you'd have to write that teardown logic yourself. – Stephen Cleary Jun 15 '20 at 12:53
  • Why not `ids.Select(async i => { await repo.GetCustomer(i);});` ? – alhazen Jul 01 '21 at 08:00
  • 1
    @alhazen: Most repositories use database connections, and most .NET database connections only allow one outstanding operation. So if you want to do multiple concurrent db operations, you need multiple db connections (and thus multiple repositories). – Stephen Cleary Jul 01 '21 at 11:57
98

Using DataFlow as svick suggested may be overkill, and Stephen's answer does not provide the means to control the concurrency of the operation. However, that can be achieved rather simply:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}

The ToArray() calls can be optimized by using an array instead of a list and replacing completed tasks, but I doubt it would make much of a difference in most scenarios. Sample usage per the OP's question:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

EDIT Fellow SO user and TPL wiz Eli Arbel pointed me to a related article from Stephen Toub. As usual, his implementation is both elegant and efficient:

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });
                      
        })); 
}
Jan
  • 3,825
  • 3
  • 31
  • 51
Ohad Schneider
  • 36,600
  • 15
  • 168
  • 198
  • Love the Eli Arbel option. Have a few follow-up questions: I'd love to keep track of progress. I added a `ref int done` to the method, and then `ContinueWith done++` but "Cannot use ref or out parameter inside an anonymous method, lambda expression, or query expression... any idea how to track progress? – Stefanvds Jul 30 '15 at 07:29
  • nevermind, i can just stick done++ in the foreachasync code – Stefanvds Jul 30 '15 at 07:43
  • I found the `ForEachAsync` code doesn't work as expected. At least, not always, for some reason or other (I cannot currently explain what's going on). With a `dop = 5` I'd get different results when calling the code (I should always get the same - the data isn't changed)! Beware! – MBender Nov 13 '15 at 15:04
  • Eli Arbel's option is nice to read, but the RunWithMaxDegreeOfConcurrency implementation will run faster. This is because Eli's splits the tasks upfront, so if some tasks run for longer in 1 partition, you still have to wait for the slowest partition to finishing running before the whole process is completed. RunWithMaxDegreeOfConcurrency runs in a chain fashion, so it should be the fastest to complete. Because the tasks are not partitioned upfront. – xx1xx Sep 23 '16 at 01:16
  • 1
    @RichardPierre actually this overload of `Partitioner.Create` uses chunk partitioning, which provides elements dynamically to the different tasks so the scenario you described will not take place. Also note that static (pre-determined) partitioning may be faster in some cases due to less overhead (specifically synchronization). For more information see: https://msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx. – Ohad Schneider Oct 01 '16 at 23:07
  • 1
    @OhadSchneider In the // observe exceptions, if that throws an exception, will it bubble up to the caller? For example, if I wanted the entire enumerable to stop processing/fail if any part of it failed? – Terry Oct 10 '16 at 22:05
  • 3
    @Terry it will bubble up to the caller in the sense that the top-most task (created by `Task.WhenAll`) will contain the exception (inside an `AggregateException`), and consequentially if said caller used `await`, an exception would be thrown in the call site. However, `Task.WhenAll` will still wait for *all* tasks to complete, and `GetPartitions` will dynamically allocate elements when `partition.MoveNext` is called until no more elements are left to process. This means that unless you add your own mechanism for stopping the processing (e.g. `CancellationToken`) it won't happen on its own. – Ohad Schneider Oct 11 '16 at 15:14
  • Following on from what @OhadSchneider said above, it is important to note **this implementation will tend to slow down over time and get closer and closer to running synchronously** because you are going to be limited by the slowest running Task in the partition. – Mark Gibbons Aug 28 '17 at 01:03
  • @OhadSchneider let's say most of the time the tasks take roughly the same amount of time to complete - 1 second. But occasionally it takes 5 seconds. If you choose a degree of parallelism of 4, it will execute 4 of those tasks at the same time, and they should be finished about a second later. But that occasional 5 second task will mean that it won't start another 3 tasks (for the 3 that completed after 1 second) until that 5 second task is finished. So depending on how often a single task runs much longer than the others, you will be blocking execution for the longest running task. – Mark Gibbons Aug 28 '17 at 03:33
  • 1
    @gibbocool I'm still not sure I follow. Suppose you have a total of 7 tasks, with the parameters you specified in your comment. Further suppose that the first batch takes the occasional 5 second task, and three 1 second tasks. After about a second, the 5-second task will still be executing whereas the three 1-second tasks will be finished. At this point the remaining three 1-second tasks will start executing (they would be supplied by the partitioner to the three "free" threads) . – Ohad Schneider Aug 28 '17 at 17:23
  • How can I observe exceptions in ForEachAsync? If body function failed for customer Id , can I log error, specifying, that it happened for particular Id? – Michael Freidgeim Dec 19 '17 at 03:27
  • @MichaelFreidgeim Not sure what you mean. You can add whatever code you want where it says `//observe exceptions`, including exception handling code (check `t.Exception`). I assume your customer ID would be embedded in `partition.Current`. – Ohad Schneider Dec 26 '17 at 15:22
  • But t.Exception doesn’t include partition.Current nor customer Id, so I can’t determine, which id caused exception – Michael Freidgeim Dec 26 '17 at 22:20
  • 2
    @MichaelFreidgeim you can do something like `var current = partition.Current` before `await body` and then use `current` in the continuation (`ContinueWith(t => { ... }`). – Ohad Schneider Dec 27 '17 at 11:21
  • 1
    Although I have upvoted this answer a long time ago, I am now realizing that none of the two suggested solutions is perfect. The first one utilizes the `Task.WhenAny`, and so it can be quite inefficient with large numbers of tasks and degree of concurrency (continuations are attached to the same tasks again and again internally). It is also incomplete regarding the handling of exceptions. The second solution, taken from a Stephen Toub's article, is suboptimal for reasons explained [here](https://stackoverflow.com/questions/38634376/52973907#comment111638079_52973907). – Theodor Zoulias Dec 09 '20 at 13:27
  • @TheodorZoulias while I agree the first approach isn't overly efficient, I don't see the issue in the second one. The link you sent says *"one worker is killed on every exception"* but that is not the case, `ContinueWith` means nothing is killed (because the exception is observed). Regarding "*A less naive implementation should make sure that an exception on any worker will terminate the whole process as fast as possible"* - this should be very easy to achieve using a cancellation token, I can post a small sample if you'd like... – Ohad Schneider Dec 09 '20 at 21:47
  • TBH I didn't notice the `ContinueWith` previously. I thought that it was Stephen Toub's version verbatim. I am not a fan of adding the `ContinueWith`, because it means that the `ForEachAsync` method is not general purpose any more, and should be copy-pasted with a different `ContinueWith` body every time is used. Instead of this, if the user wants to handle each exception individually, they can just do it manually using a `try`/`catch` inside their `body` implementation. Also the `ContinueWith` does not handle synchronous exceptions thrown directly by the `body(partition.Current)` invocation. – Theodor Zoulias Dec 10 '20 at 00:51
  • My opinion about the optimal behavior of a `ForEachAsync` method is that it should imitate the way TPL Dataflow works. In case of any exception no more asynchronous operations should be started, the already started operations should be awaited (resulting potentially to more exceptions), and finally all observed exceptions should be propagated as an `AggregateException`. Honestly just using an `ActionBlock` seems like the best option, now that the TPL Dataflow is embedded in the platform (.NET 5). I wouldn't even use my [own](https://stackoverflow.com/a/64455549/11178549) implementation. – Theodor Zoulias Dec 10 '20 at 01:14
  • @TheodorZoulias Regarding cancellation on exception, as I said it should be easy to achieve what you're describing via a cancelation token (which you check before every `while` iteration). RE sync exceptions, I guess you could wrap `body` in `Task.Run` or just add `try/catch`. General purpose - you can always pass something like an Action to handle exceptions generally... – Ohad Schneider Dec 10 '20 at 11:21
  • 1
    Ohad, yeap the `ForEachAsync` method can be certainly improved and perfected. My point is that in its current state it's not perfect, and perfecting it may be a low return investment now that .NET has powerful built-in tools that can do the job (without us having to worry about how these tools are implemented). – Theodor Zoulias Dec 10 '20 at 11:57
  • This is the most simple and useful way of ForEachAsync which I've found so far. For modern C# you can just use `Task.WhenAny(activeTasks)` and `Task.WhenAll(activeTasks)`, no need to convert to array. – Alek Depler May 15 '21 at 16:47
55

You can save effort with the new AsyncEnumerator NuGet Package, which didn't exist 4 years ago when the question was originally posted. It allows you to control the degree of parallelism:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

Disclaimer: I'm the author of the AsyncEnumerator library, which is open source and licensed under MIT, and I'm posting this message just to help the community.

Serge Semenov
  • 9,232
  • 3
  • 23
  • 24
  • Your library isn't compatible with .NET Core. – Corniel Nobel Jun 30 '18 at 10:02
  • 2
    @CornielNobel, it is compatible with .NET Core - the source code on GitHub has a test coverage for both .NET Framework and .NET Core. – Serge Semenov Jun 30 '18 at 15:34
  • I bet that was `AsyncEnumerable` instead of `AsyncEnumerator` :) – Serge Semenov Jun 30 '18 at 16:54
  • 2
    @SergeSemenov I've used your library a lot for its `AsyncStreams` and I've got to say it's excellent. Can't recommend this library enough. – WBuck Oct 09 '19 at 11:49
  • @SergeSemenov Does ParallelForEachAsync preserver ordering and is it possible somehow to manage it? – Tomas Feb 27 '20 at 12:01
  • @Tomas, no it does not preserve the order due to the nature of parallel processing. However, there is an overloaded method that takes in an item index if you need to know the order of items inside the processing function. – Serge Semenov Feb 27 '20 at 17:26
  • @SergeSemenov If any error occurred for any of the method, will loop terminate, will it throw aggregate exceptions or only first exception? – Vikram Singh Saini May 10 '20 at 12:04
  • @SergeSemenov How do you break from Parallel ForEach? In normal Parallel>ForEach we use state.Break(); – Souvik Ghosh Jun 15 '20 at 07:11
  • @SouvikGhosh, the `ParallelForEachAsync` takes in a `CancellationToken`. If you want to beak from inside the loop, you should create a `CancellationTokenSource` first. – Serge Semenov Jun 15 '20 at 21:26
18

Wrap the Parallel.Foreach into a Task.Run() and instead of the await keyword use [yourasyncmethod].Result

(you need to do the Task.Run thing to not block the UI thread)

Something like this:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;
Liam
  • 27,717
  • 28
  • 128
  • 190
ofcoursedude
  • 456
  • 4
  • 10
  • 4
    What's the problem with this? I'd have done it exactly like this. Let `Parallel.ForEach` do the parallel work, which blocks until all are done, and then push the whole thing to a background thread to have a responsive UI. Any issues with that? Maybe that's one sleeping thread too much, but it's short, readable code. – ygoe Jun 17 '15 at 18:22
  • @LonelyPixel My only issue is that it calls `Task.Run` when `TaskCompletionSource` is preferable. – Gusdor Mar 30 '16 at 13:31
  • 1
    @Gusdor Curious - why is `TaskCompletionSource` preferable? – Seafish Jul 13 '16 at 14:34
  • @Seafish A good question that I wish I could answer. Must have been a rough day :D – Gusdor Jul 13 '16 at 15:48
  • 1
    Just a short update. I was looking for exactly this now, scrolled down to find the simplest solution and found my own comment again. I used exactly this code and it works as expected. It only assumes that there is a Sync version of the original Async calls within the loop. `await` can be moved in the front to save the extra variable name. – ygoe Mar 01 '17 at 20:21
  • 1
    I am not sure what you scenario is, but I believe you can remove the Task.Run(). Just appending a .Result or .Wait to the end is enough to make the Parallel execution wait for all threads to complete. – Eduard G Nov 03 '20 at 16:34
  • GetCustomer() is already awaited with .Result(). You don't need the last line. Again since these call an async method in a repository instance, a better design would be to add the tasks to a List and execute a WhenAll() in the end. It is non-blocking and the Parallel.ForEach wrapper is unnecessary. – Denny Jacob Oct 05 '21 at 20:30
8

This should be pretty efficient, and easier than getting the whole TPL Dataflow working:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}
John Gietzen
  • 48,783
  • 32
  • 145
  • 190
  • Shouldn't the usage example use `await` like: `var customers = await ids.SelectAsync(async i => { ... });`? – Paccc Dec 14 '14 at 04:02
8

An extension method for this which makes use of SemaphoreSlim, and also allows to set maximum degree of parallelism:

/// <summary>Concurrently Executes async actions for each item of
/// <see cref="IEnumerable<typeparamref name="T"/></summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of
/// <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the
/// maximum degree of parallelism, Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel
/// is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
    this IEnumerable<T> enumerable,
    Func<T, Task> action,
    int? maxDegreeOfParallelism = null)
{
    if (maxDegreeOfParallelism.HasValue)
    {
        using (var semaphoreSlim = new SemaphoreSlim(
            maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
        {
            var tasksWithThrottler = new List<Task>();

            foreach (var item in enumerable)
            {
                // Increment the number of currently running tasks and wait if they
                // are more than limit.
                await semaphoreSlim.WaitAsync();

                tasksWithThrottler.Add(Task.Run(async () =>
                {
                    await action(item).ContinueWith(res =>
                    {
                        // action is completed, so decrement the number of
                        // currently running tasks
                        semaphoreSlim.Release();
                    }, TaskScheduler.Default);
                }));
            }

            // Wait for all tasks to complete.
            await Task.WhenAll(tasksWithThrottler.ToArray());
        }
    }
    else
    {
        await Task.WhenAll(enumerable.Select(item => action(item)));
    }
}
    

Sample usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Jay Shah
  • 3,553
  • 1
  • 27
  • 26
7

I am a little late to party but you may want to consider using GetAwaiter.GetResult() to run your async code in sync context but as paralled as below;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});
Teoman shipahi
  • 47,454
  • 15
  • 134
  • 158
5

After introducing a bunch of helper methods, you will be able run parallel queries with this simple syntax:

const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);

What happens here is: we split source collection into 10 chunks (.Split(DegreeOfParallelism)), then run 10 tasks each processing its items one by one (.SelectManyAsync(...)) and merge those back into a single list.

Worth mentioning there is a simpler approach:

double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);

But it needs a precaution: if you have a source collection that is too big, it will schedule a Task for every item right away, which may cause significant performance hits.

Extension methods used in examples above look as follows:

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));

        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }

            yield return list;
        }
    }

    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }

    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }

    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}
BoarGules
  • 16,440
  • 2
  • 27
  • 44
Vitaliy Ulantikov
  • 10,157
  • 3
  • 61
  • 54
2

The problem of parallelizing asynchronous operations has been solved with the introduction of the Parallel.ForEachAsync API in .NET 6, but people who are using older .NET platforms might still need a decent substitute. An easy way to implement one is to use an ActionBlock<T> component from the TPL Dataflow library. This library is included in the standard .NET libraries (.NET Core and .NET 5+), and available as a NuGet package for the .NET Framework. Here is how it can be used:

public static Task Parallel_ForEachAsync<T>(ICollection<T> source,
    int maxDegreeOfParallelism, Func<T, Task> action)
{
    var options = new ExecutionDataflowBlockOptions();
    options.MaxDegreeOfParallelism = maxDegreeOfParallelism;
    var block = new ActionBlock<T>(action, options);
    foreach (var item in source) block.Post(item);
    block.Complete();
    return block.Completion;
}

This solution is only suitable for materialized source sequences, hence the type of the parameter is ICollection<T> instead of the more common IEnumerable<T>. It also has the surprising behavior of ignoring any OperationCanceledExceptions thrown by the action. Addressing these nuances and attempting to replicate precisely the features and behavior of the Parallel.ForEachAsync is doable, but it requires almost as much code as if more primitive tools were used. I've posted such an attempt in the 9th revision of this answer.


Below is a different attempt to implement the Parallel.ForEachAsync method, offering exactly the same features as the .NET 6 API, and mimicking its behavior as much as possible. It uses only basic TPL tools. The idea is to create a number of worker tasks equal to the desirable MaxDegreeOfParallelism, with each task enumerating the same enumerator in a synchronized fashion. This is similar to how the Parallel.ForEachAsync is implemented internally. The difference is that the .NET 6 API starts with a single worker and progressively adds more, while the implementation below creates all the workers from the start:

public static Task Parallel_ForEachAsync<T>(IEnumerable<T> source,
    ParallelOptions parallelOptions,
    Func<T, CancellationToken, Task> body)
{
    if (source == null) throw new ArgumentNullException("source");
    if (parallelOptions == null) throw new ArgumentNullException("parallelOptions");
    if (body == null) throw new ArgumentNullException("body");
    int dop = parallelOptions.MaxDegreeOfParallelism;
    if (dop < 0) dop = Environment.ProcessorCount;
    CancellationToken cancellationToken = parallelOptions.CancellationToken;
    TaskScheduler scheduler = parallelOptions.TaskScheduler ?? TaskScheduler.Current;

    IEnumerator<T> enumerator = source.GetEnumerator();
    CancellationTokenSource cts = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);
    var semaphore = new SemaphoreSlim(1, 1); // Synchronizes the enumeration
    var workerTasks = new Task[dop];
    for (int i = 0; i < dop; i++)
    {
        workerTasks[i] = Task.Factory.StartNew(async () =>
        {
            try
            {
                while (true)
                {
                    if (cts.IsCancellationRequested)
                    {
                        cancellationToken.ThrowIfCancellationRequested();
                        break;
                    }
                    T item;
                    await semaphore.WaitAsync(); // Continue on captured context.
                    try
                    {
                        if (!enumerator.MoveNext()) break;
                        item = enumerator.Current;
                    }
                    finally { semaphore.Release(); } 
                    await body(item, cts.Token); // Continue on captured context.
                }
            }
            catch { cts.Cancel(); throw; }
        }, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler)
            .Unwrap();
    }
    return Task.WhenAll(workerTasks).ContinueWith(t =>
    {
        // Clean up
        try { semaphore.Dispose(); cts.Dispose(); } finally { enumerator.Dispose(); }
        return t;
    }, CancellationToken.None, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

There is a difference in the signature. The body parameter is of type Func<TSource, CancellationToken, Task> instead of Func<TSource, CancellationToken, ValueTask>. This is because value-tasks are a relatively recent feature, and are not available in .NET Framework.

There is also a difference in the behavior. This implementation reacts to OperationCanceledExceptions thrown by the body, by completing as canceled. The correct behavior would be to propagate these exceptions as individual errors, and complete as faulted. Fixing this minor flaw is doable, but I preferred to not complicate further this relatively short and readable implementation.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • How does this compare with the other `ForEachAsync()` implementation you shared [here](https://stackoverflow.com/a/64455549/1700998) ? – alhazen Jul 01 '21 at 08:35
  • @alhazen I have ditched the [other implementation](https://stackoverflow.com/revisions/64455549/4) for being too idiomatic. The `Parallel_ForEachAsync` implementation in this answer replicates the features and behavior of the native `Parallel.ForEachAsync`, which invokes the `body` in parallel. My other implementation invoked it sequentially, for one item at a time. This is their main behavioral difference, [parallelism vs asynchronous concurrency](https://github.com/dotnet/runtime/issues/82359). – Theodor Zoulias Mar 03 '23 at 15:16
-1

Easy native way without TPL:

int totalThreads = 0; int maxThreads = 3;

foreach (var item in YouList)
{
    while (totalThreads >= maxThreads) await Task.Delay(500);
    Interlocked.Increment(ref totalThreads);

    MyAsyncTask(item).ContinueWith((res) => Interlocked.Decrement(ref totalThreads));
}

you can check this solution with next task:

async static Task MyAsyncTask(string item)
{
    await Task.Delay(2500);
    Console.WriteLine(item);
}
  • Nice try, but there are multiple problems with this approach: Accessing the non-`volatile` variable `totalThreads` without synchronization. Waiting unproductively in a loop for a condition to be met (introduces latency). Using the [primitive](https://blog.stephencleary.com/2013/10/continuewith-is-dangerous-too.html) `ContinueWith` method without specifying the `TaskScheduler`. Possibility of leaking fire-and-forget tasks, in case the `MyAsyncTask` throws synchronously. This functionality is surprisingly tricky, and it's unlikely to get it right with the first try by doing it yourself. – Theodor Zoulias May 09 '21 at 23:18