197

Is it possible to use Async when using ForEach? Below is the code I am trying:

using (DataContext db = new DataLayer.DataContext())
{
    db.Groups.ToList().ForEach(i => async {
        await GetAdminsFromGroup(i.Gid);
    });
}

I am getting the error:

The name 'Async' does not exist in the current context

The method the using statement is enclosed in is set to async.

Johann
  • 4,107
  • 3
  • 40
  • 39
James Jeffery
  • 12,093
  • 19
  • 74
  • 108

11 Answers11

273

List<T>.ForEach doesn't play particularly well with async (neither does LINQ-to-objects, for the same reasons).

In this case, I recommend projecting each element into an asynchronous operation, and you can then (asynchronously) wait for them all to complete.

using (DataContext db = new DataLayer.DataContext())
{
    var tasks = db.Groups.ToList().Select(i => GetAdminsFromGroupAsync(i.Gid));
    var results = await Task.WhenAll(tasks);
}

The benefits of this approach over giving an async delegate to ForEach are:

  1. Error handling is more proper. Exceptions from async void cannot be caught with catch; this approach will propagate exceptions at the await Task.WhenAll line, allowing natural exception handling.
  2. You know that the tasks are complete at the end of this method, since it does an await Task.WhenAll. If you use async void, you cannot easily tell when the operations have completed.
  3. This approach has a natural syntax for retrieving the results. GetAdminsFromGroupAsync sounds like it's an operation that produces a result (the admins), and such code is more natural if such operations can return their results rather than setting a value as a side effect.
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 10
    Not that it changes anything, but `List.ForEach()` is not part of LINQ. – svick Sep 07 '13 at 02:10
  • 1
    Great suggestion @StephenCleary and thank you for all the answers you've given about `async`. They have been very helpful! – Justin Helgerson Apr 03 '14 at 20:06
  • Will this kick off the tasks in parallel? If so, what if you want them executed one at a time? Is there an extension for that? – Stewart Anderson Aug 01 '15 at 23:55
  • 6
    @StewartAnderson: The tasks will execute concurrently. There's no extension for serial execution; just do a `foreach` with an `await` in your loop body. – Stephen Cleary Aug 02 '15 at 01:50
  • @StephenCleary "List.ForEach doesn't play particularly well with async (neither does LINQ-to-objects, for the same reasons)" Why is it so? I just ran into a bug over this and thank god I spotted it and rewrote the code to use foreach loop but would like to know why List.ForEach doesn't just work with async lambda. Thanks – mare Oct 17 '19 at 15:22
  • 6
    @mare: `ForEach` only takes a synchronous delegate type, and there's no overload taking an asynchronous delegate type. So the short answer is "no one wrote an asynchronous `ForEach`". The longer answer is that you'd have to assume some semantics; e.g., should the items be processed one at a time (like `foreach`), or simultaneously (like `Select`)? If one at a time, wouldn't asynchronous streams be a better solution? If simultaneously, should the results be in original item order or in order of completion? Should it fail on the first failure or wait until all have completed? Etc. – Stephen Cleary Oct 17 '19 at 18:01
  • @StephenCleary, is there a way to specify MAXDOP with this approach? I'm trying to execute multiple queries against a database, and I don't want to stretch its resources too thin. – Roger Wolf Apr 05 '20 at 06:32
  • 2
    @RogerWolf: Yes; use `SemaphoreSlim` to throttle asynchronous tasks. – Stephen Cleary Apr 05 '20 at 10:53
  • I would like to add that doing pagination on this using .Skip() and .Take() is a good addition to the logic in order to implement some batching. – GeorgiG May 07 '21 at 16:28
86

This little extension method should give you exception-safe async iteration:

public static async Task ForEachAsync<T>(this List<T> list, Func<T, Task> func)
{
    foreach (var value in list)
    {
        await func(value);
    }
}

Since we're changing the return type of the lambda from void to Task, exceptions will propagate up correctly. This will allow you to write something like this in practice:

await db.Groups.ToList().ForEachAsync(async i => {
    await GetAdminsFromGroup(i.Gid);
});
JD Courtoy
  • 2,855
  • 25
  • 27
  • Instead of awaiting the ForEachAsyn(), one could also call a Wait(). – Jonas Feb 03 '17 at 21:08
  • Lambda do not need to be awaited here. – hazzik Apr 03 '17 at 01:25
  • I would add support for CancellationToken into that as in Todd's Answer here https://stackoverflow.com/questions/29787098/doesnt-await-when-using-foreachasync-with-await-inside-action – Zorkind Dec 18 '17 at 17:45
  • Out of curiosity: What is the difference between you doing the foreach in the example and this: `someList.ForeEach(async item => { await SomeFunctionAsync(); }) ` When changing my code to your approach my data gets loaded correctly from the database. When using my approach here the data that is in the list is not shwon in my result. Do you know why? – Florent Mar 09 '22 at 13:38
53

Starting with C# 8.0, you can create and consume streams asynchronously.

    private async void button1_Click(object sender, EventArgs e)
    {
        IAsyncEnumerable<int> enumerable = GenerateSequence();

        await foreach (var i in enumerable)
        {
            Debug.WriteLine(i);
        }
    }

    public static async IAsyncEnumerable<int> GenerateSequence()
    {
        for (int i = 0; i < 20; i++)
        {
            await Task.Delay(100);
            yield return i;
        }
    }

More

Andrei Krasutski
  • 4,913
  • 1
  • 29
  • 35
  • 3
    This has the advantage that on top of awaiting each element, you are now also awaiting the `MoveNext` of the enumerator. This is important in cases where the enumerator cannot fetch the next element instantly, and must wait for one to become available. – Theodor Zoulias Sep 26 '19 at 09:10
  • 10
    This lacks an explanation/example how this would be applied to OPs problem. – Good Night Nerd Pride Apr 12 '21 at 12:31
37

The simple answer is to use the foreach keyword instead of the ForEach() method of List().

using (DataContext db = new DataLayer.DataContext())
{
    foreach(var i in db.Groups)
    {
        await GetAdminsFromGroup(i.Gid);
    }
}
RubberDuck
  • 11,933
  • 4
  • 50
  • 95
  • 2
    Why does this work instead of using `ForEach()` method of `List()`? – taylorswiftfan Apr 26 '21 at 23:20
  • 1
    The signature is `public void ForEach (Action action)` @taylorswiftfan. In order for it to work, it would need to be `ForEach(Task action)`. Since we're not passing a delegate with the plain ol' for loop, we can await each call inside the loop. There are better ways these days though. https://stackoverflow.com/a/58112039/3198973 – RubberDuck Apr 27 '21 at 12:36
  • @RubberDuck Whenever you say this *the answer detailing the modern way of handling this.* then are you interested about Entity Framework Core or something else? – Peter Csala Aug 23 '21 at 12:32
  • @PeterCsala I don't know what you're talking about. I'm rewarding an existing answer with that bounty. This one. https://stackoverflow.com/a/58112039/3198973 – RubberDuck Aug 25 '21 at 19:07
  • 2
    I actually think this is the simplest way – Maccurt Dec 10 '21 at 13:35
14

Here is an actual working version of the above async foreach variants with sequential processing:

public static async Task ForEachAsync<T>(this List<T> enumerable, Action<T> action)
{
    foreach (var item in enumerable)
        await Task.Run(() => { action(item); }).ConfigureAwait(false);
}

Here is the implementation:

public async void SequentialAsync()
{
    var list = new List<Action>();

    Action action1 = () => {
        //do stuff 1
    };

    Action action2 = () => {
        //do stuff 2
    };

    list.Add(action1);
    list.Add(action2);

    await list.ForEachAsync();
}

What's the key difference? .ConfigureAwait(false); which keeps the context of main thread while async sequential processing of each task.

mrogunlana
  • 827
  • 9
  • 11
10

This is not an old question, but .Net 6 introduced Parallel.ForeachAsync:

var collectionToIterate = db.Groups.ToList();
await Parallel.ForEachAsync(collectionToIterate, async (i, token) =>
{
    await GetAdminsFromGroup(i);
});

ForeachAsync also accepts a ParallelOptions object, but usually you don't want to mess with the MaxDegreeOfParallelism property:

ParallelOptions parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 4 };
var collectionToIterate = db.Groups.ToList();

await Parallel.ForEachAsync(collectionToIterate, parallelOptions , async (i, token) =>
{
    await GetAdminsFromGroup(i);
});

From Microsoft Docs: https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.paralleloptions.maxdegreeofparallelism?view=net-6.0

By default, For and ForEach will utilize however many threads the underlying scheduler provides, so changing MaxDegreeOfParallelism from the default only limits how many concurrent tasks will be used.

Generally, you do not need to modify this setting....

A. Jesus Flores
  • 141
  • 1
  • 6
2

Add this extension method

public static class ForEachAsyncExtension
{
    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop) 
            select Task.Run(async delegate
            {
                using (partition)
                    while (partition.MoveNext())
                        await body(partition.Current).ConfigureAwait(false);
            }));
    }
}

And then use like so:

Task.Run(async () =>
{
    var s3 = new AmazonS3Client(Config.Instance.Aws.Credentials, Config.Instance.Aws.RegionEndpoint);
    var buckets = await s3.ListBucketsAsync();

    foreach (var s3Bucket in buckets.Buckets)
    {
        if (s3Bucket.BucketName.StartsWith("mybucket-"))
        {
            log.Information("Bucket => {BucketName}", s3Bucket.BucketName);

            ListObjectsResponse objects;
            try
            {
                objects = await s3.ListObjectsAsync(s3Bucket.BucketName);
            }
            catch
            {
                log.Error("Error getting objects. Bucket => {BucketName}", s3Bucket.BucketName);
                continue;
            }

            // ForEachAsync (4 is how many tasks you want to run in parallel)
            await objects.S3Objects.ForEachAsync(4, async s3Object =>
            {
                try
                {
                    log.Information("Bucket => {BucketName} => {Key}", s3Bucket.BucketName, s3Object.Key);
                    await s3.DeleteObjectAsync(s3Bucket.BucketName, s3Object.Key);
                }
                catch
                {
                    log.Error("Error deleting bucket {BucketName} object {Key}", s3Bucket.BucketName, s3Object.Key);
                }
            });

            try
            {
                await s3.DeleteBucketAsync(s3Bucket.BucketName);
            }
            catch
            {
                log.Error("Error deleting bucket {BucketName}", s3Bucket.BucketName);
            }
        }
    }
}).Wait();
superlogical
  • 14,332
  • 9
  • 66
  • 76
1

If you are using EntityFramework.Core there is an extension method ForEachAsync.

The example usage looks like this:

using Microsoft.EntityFrameworkCore;
using System.Threading.Tasks;

public class Example
{
    private readonly DbContext _dbContext;
    public Example(DbContext dbContext)
    {
        _dbContext = dbContext;
    }
    public async void LogicMethod()
    {
        
        await _dbContext.Set<dbTable>().ForEachAsync(async x =>
        {
            //logic
            await AsyncTask(x);
        });
    }

    public async Task<bool> AsyncTask(object x)
    {
        //other logic
        return await Task.FromResult<bool>(true);
    }
}
ElConrado
  • 1,477
  • 4
  • 20
  • 46
-1

I would like to add that there is a Parallel class with ForEach function built in that can be used for this purpose.

Luk164
  • 657
  • 8
  • 22
  • 2
    The `Parallel` class [is not async-friendly](https://stackoverflow.com/questions/15136542/parallel-foreach-with-asynchronous-lambda). – Theodor Zoulias Jan 02 '21 at 15:47
  • @TheodorZoulias I see. I thought it would achieve a simmilar result - crunch the list with multiple threads as fast as possible. I misunderstood the desired outcome. – Luk164 Jan 02 '21 at 17:29
  • The `Parallel` class is great for CPU-bound work, but not for I/O-bound work. For example [you don't really need threads](https://blog.stephencleary.com/2013/11/there-is-no-thread.html) when you are doing web requests. – Theodor Zoulias Jan 02 '21 at 17:55
-2

The problem was that the async keyword needs to appear before the lambda, not before the body:

db.Groups.ToList().ForEach(async (i) => {
    await GetAdminsFromGroup(i.Gid);
});
Kirk Woll
  • 76,112
  • 22
  • 180
  • 195
James Jeffery
  • 12,093
  • 19
  • 74
  • 108
-3

This is method I created to handle async scenarios with ForEach.

  • If one of tasks fails then other tasks will continue their execution.
  • You have ability to add function that will be executed on every exception.
  • Exceptions are being collected as aggregateException at the end and are available for you.
  • Can handle CancellationToken
 public static class ParallelExecutor
    {
        /// <summary>
        /// Executes asynchronously given function on all elements of given enumerable with task count restriction.
        /// Executor will continue starting new tasks even if one of the tasks throws. If at least one of the tasks throwed exception then <see cref="AggregateException"/> is throwed at the end of the method run.
        /// </summary>
        /// <typeparam name="T">Type of elements in enumerable</typeparam>
        /// <param name="maxTaskCount">The maximum task count.</param>
        /// <param name="enumerable">The enumerable.</param>
        /// <param name="asyncFunc">asynchronous function that will be executed on every element of the enumerable. MUST be thread safe.</param>
        /// <param name="onException">Acton that will be executed on every exception that would be thrown by asyncFunc. CAN be thread unsafe.</param>
        /// <param name="cancellationToken">The cancellation token.</param>
        public static async Task ForEachAsync<T>(int maxTaskCount, IEnumerable<T> enumerable, Func<T, Task> asyncFunc, Action<Exception> onException = null, CancellationToken cancellationToken = default)
        {
            using var semaphore = new SemaphoreSlim(initialCount: maxTaskCount, maxCount: maxTaskCount);

            // This `lockObject` is used only in `catch { }` block.
            object lockObject = new object();
            var exceptions = new List<Exception>();
            var tasks = new Task[enumerable.Count()];
            int i = 0;

            try
            {
                foreach (var t in enumerable)
                {
                    await semaphore.WaitAsync(cancellationToken);
                    tasks[i++] = Task.Run(
                        async () =>
                        {
                            try
                            {
                                await asyncFunc(t);
                            }
                            catch (Exception e)
                            {
                                if (onException != null)
                                {
                                    lock (lockObject)
                                    {
                                        onException.Invoke(e);
                                    }
                                }

                                // This exception will be swallowed here but it will be collected at the end of ForEachAsync method in order to generate AggregateException.
                                throw;
                            }
                            finally
                            {
                                semaphore.Release();
                            }
                        }, cancellationToken);

                    if (cancellationToken.IsCancellationRequested)
                    {
                        break;
                    }
                }
            }
            catch (OperationCanceledException e)
            {
                exceptions.Add(e);
            }

            foreach (var t in tasks)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    break;
                }

                // Exception handling in this case is actually pretty fast.
                // https://gist.github.com/shoter/d943500eda37c7d99461ce3dace42141
                try
                {
                    await t;
                }
#pragma warning disable CA1031 // Do not catch general exception types - we want to throw that exception later as aggregate exception. Nothing wrong here.
                catch (Exception e)
#pragma warning restore CA1031 // Do not catch general exception types
                {
                    exceptions.Add(e);
                }
            }

            if (exceptions.Any())
            {
                throw new AggregateException(exceptions);
            }
        }
    }
Shoter
  • 976
  • 11
  • 23
  • There are some problems with your implementation. 1. The source `IEnumerable` is enumerated twice because of the `Count()`, which is a no-no. The enumerable may be hitting the database/filesystem/web on each enumeration. 2. The `Task.Factory.StartNew(async` produces a `Task`, and you are awaiting only the outer `Task`. The method may not propagate all exceptions, and may return before all tasks have been completed. And 3. using the `cancellationToken.IsCancellationRequested` may result to inconsistent cancellation behavior. Use the `ThrowIfCancellationRequested` instead. – Theodor Zoulias Dec 15 '20 at 05:49
  • Regarding the `Action onException = null` parameter, what is its use case? Is it for logging the errors as soon as they happen, as opposed to logging them in batch after the completion of the `ForEachAsync` method? – Theodor Zoulias Dec 15 '20 at 07:30
  • @TheodorZoulias 1. You can change type to IList for example. 1b. That's how IEnumerable works 2. Interesting. Will check that. Are you probably talking about using only first exception from AggregateException in case of await and tasks? 3. That was per my design. 4. For me it was used to monitor how many exceptions had been thrown and request cancellation after threshold had been met. – Shoter Dec 15 '20 at 08:59
  • 1
    You were right with exceptions. Now I use `Task.Run` and it does not swallow them. – Shoter Dec 15 '20 at 09:11
  • I suggest to change the parameter type from `IEnumerable` to `IList`, to prevent users from passing deferred enumerables an arguments. Or change the implementation so that the enumerable is enumerated only once. If you study the [source code](https://referencesource.microsoft.com/system.core/system/linq/Enumerable.cs.html) of all LINQ operators, you'll see that no operator is breaking this rule. – Theodor Zoulias Dec 15 '20 at 09:27
  • Good idea. I will change it today. – Shoter Dec 15 '20 at 09:27
  • *"3. That was per my design"* <== I hope that you understand that with your current design a cancellation may result to either an `OperationCanceledException` (thrown by the `semaphore.WaitAsync(cancellationToken)`), or to a normal (not-exceptional) completion. In other words the outcome is not deterministic. This is not a great design if you ask me! – Theodor Zoulias Dec 15 '20 at 10:03
  • 3. Good catch. Now my implementation is catching this exception and attaches it to exceptions at the end and it will throw them. It will be up to user to decide whether ot swallow excpetion or not. – Shoter Dec 15 '20 at 11:09
  • When it comes to `IEnumerable` I am leaving it up to users of stack overflow if they want to change it. For my use case I want to leave it as there is nothing like `ICountableEnumerable` which would guarantee that Count() is `O(1)`. And some enumerables have cost of O(1) to count and cost of O(n) to create `ToList`. An example of such IEnumerable is `ILookup`. – Shoter Dec 15 '20 at 11:13
  • 1
    @TheodorZoulias Thank you for helping me with improving that design. Great work! I am really grateful :) – Shoter Dec 15 '20 at 11:15
  • Honestly the issue 3 has not been fully fixed. There is another `cancellationToken.IsCancellationRequested` that could cause the method to return without exceptions, before all tasks are awaited. Regarding the issue 1, you could use a `List` instead of a `Task[enumerable.Count()]`, to eliminate the need for double enumeration of the source. Another thing to watch out is that an exception thrown by the `onException` handler will be propagated through the final `AggregateException`, replacing (and swallowing) the original exception thrown by the task. – Theodor Zoulias Dec 15 '20 at 11:37
  • Btw I don't want to disappoint you, but what you are trying to do here can be done easily with an `ActionBlock` in 5 lines of code. You can see an implementation [here](https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach/65251949#65251949). The `ForEachAsync` method has already lots of implementations, so if you want to add another one you should make sure that it is robust, because the competition is fierce. :-) – Theodor Zoulias Dec 15 '20 at 11:55
  • 1
    Thank you four insight. Next time I am going to take a look at TPL library. Thanks for direction! @TheodorZoulias – Shoter Dec 15 '20 at 14:19