9

The following should return "C", but it returns "B"

using System.Data.Entity;
//...
var state = "A";
var qry = (from f in db.myTable select f);
await qry.ForEachAsync(async (myRecord) => {
   await DoStuffAsync(myRecord);
   state = "B";
});
state = "C";
return state;

It doesn't wait for DoStuffAsync to complete, state="C" runs through and then later state="B" executes (because inside it is still awaiting).

i3arnon
  • 113,022
  • 33
  • 324
  • 344
Kind Contributor
  • 17,547
  • 6
  • 53
  • 70

3 Answers3

14

That's because the implementation of ForEachAsync doesn't await the delegated action

moveNextTask = enumerator.MoveNextAsync(cancellationToken);
action(current);

see https://github.com/mono/entityframework/blob/master/src/EntityFramework/Infrastructure/IDbAsyncEnumerableExtensions.cs#L19

But that is because, you can't await an action, the delegate needs to be a Func which returns a Task - see How do you implement an async action delegate method?

Therefore, until Microsoft provides a signature which includes a Func delegate and calls it with await, you'll have to roll your own extension method. I'm using the following at the moment.

public static async Task ForEachAsync<T>(
    this IQueryable<T> enumerable, Func<T, Task> action, CancellationToken cancellationToken) //Now with Func returning Task
{
    var asyncEnumerable = (IDbAsyncEnumerable<T>)enumerable;
    using (var enumerator = asyncEnumerable.GetAsyncEnumerator())
    {

        if (await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false))
        {
            Task<bool> moveNextTask;
            do
            {
                var current = enumerator.Current;
                moveNextTask = enumerator.MoveNextAsync(cancellationToken);
                await action(current); //now with await
            }
            while (await moveNextTask.ConfigureAwait(continueOnCapturedContext: false));
        }
    }
}

With this, the original test code in your OP will work as expected.

Community
  • 1
  • 1
Kind Contributor
  • 17,547
  • 6
  • 53
  • 70
  • I'm not sure how well your own ForEachAsync will go alongside the Action version. I simply removed the using System.Data.Entities; and had my own namespace. – Kind Contributor Apr 22 '15 at 03:21
1

.Net 6 Introduced Parallel.ForEachAsync(...):

var state = "A";
var qry = (from f in db.myTable select f);
await Parallel.ForEachAsync(qry, async (myRecord, token) =>
{
    await DoStuffAsync(myRecord);
    state = "B";
});
state = "C";
return state;

You can set a MaxDegreeOfParallelism property with a ParallelOptions parameter, but usually you don't need it.

From Microsoft Docs: https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.paralleloptions.maxdegreeofparallelism?view=net-6.0

By default, For and ForEach will utilize however many threads the underlying scheduler provides, so changing MaxDegreeOfParallelism from the default only limits how many concurrent tasks will be used.

Generally, you do not need to modify this setting....

A. Jesus Flores
  • 141
  • 1
  • 6
  • 1
    Regarding the *"Generally, you do not need to modify this setting"* advice, you might find [this](https://github.com/dotnet/runtime/issues/72981 "ParallelOptions.MaxDegreeOfParallelism documentation, is the given advice correct?") GitHub issue interesting. I am not sure that it applies to `Parallel.ForEachAsync`. If you don't specify the `MaxDegreeOfParallelism`, the default is `Environment.ProcessorCount`, which is quite arbitrary for an asynchronous operation. – Theodor Zoulias Aug 05 '22 at 22:21
  • Async isn't about Parallelism. It's about decoupling Tasks from Threads. Most commonly, I use Async for more efficient IO, so that the thread isn't blocked until an IO result returns. So in this case, your answer is valid, but it doesn't minimally solve the problem, it also distributes the tasks to multiple CPU cores. If the Async Tasks MUST be run sequentially, this answer won't work. (Also I have a feeling this Parallel ForEach API has been around for a while) – Kind Contributor Aug 09 '22 at 01:37
0

Since DbSet implements IAsyncEnumerable, consider using the following extension method:

public async static Task ForEachAsync<T>(this IAsyncEnumerable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default)
{
    if (source == null) return;
    await foreach (T item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
    {
        await action(item);
    }
}

Usage:

var qry = (from f in db.myTable select f);
await qry
     .AsAsyncEnumerable()
     .ForEachAsync(async arg =>
     {
         await DoStuffAsync(arg);
     });
Rafi Henig
  • 5,950
  • 2
  • 16
  • 36
  • 1
    Your implementation is similar to the [`ForEachAwaitAsync`](https://github.com/dotnet/reactive/blob/main/Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ForEach.cs) operator of the [System.Linq.Async](https://www.nuget.org/packages/System.Linq.Async) library. Todd's [solution](https://stackoverflow.com/a/29787099/11178549) is more interesting IMHO, because it allows for concurrency in a way that it's unlikely to create problems, while improving the performance at the same time. – Theodor Zoulias Feb 14 '21 at 00:11
  • Also the `ConfigureAwait(false)` means that the `action` will not be invoked on the current `SynchronizationContext`. So for example if the project's type is WinForms and the `action` contains UI-related code, the `ForEachAsync` method will fail. – Theodor Zoulias Feb 14 '21 at 18:25
  • `@Theodor Zoulias` thanks for your point, indeed, consider using `ConfigureAwait(true)` in UI applications (when you need need a synchronization context), otherwise you should always use `ConfigureAwait(false)`, (looking at `@Todd's` answer I could see that he used `ConfigureAwait(continueOnCapturedContext: false)` too – Rafi Henig Feb 16 '21 at 03:50
  • `@Theodor Zoulias` *"allows for concurrency in a way that it's unlikely to create problems"* it might be helpful (for me and others) to explain it. – Rafi Henig Feb 16 '21 at 03:50
  • Todd's solution allows each `action` on an item to occur concurrently with fetching the sequence's next item. These two concurrent operations are unlikely to depend on each other (by sharing state that needs to be synchronized), because they are quite distinct operations. Compare this to executing two actions for two different elements concurrently. This has a higher chance to create problems, because the concurrent operations are homogeneous, and may depend on some non-thread-safe shared state (a `DBConnection` for example). – Theodor Zoulias Feb 16 '21 at 04:04
  • Regarding the `ConfigureAwait(false)` dilemma, you can see [here](https://github.com/App-vNext/Polly/wiki/Asynchronous-action-execution#synchronizationcontext) how the Polly library deals with it. They offer overloads with a `bool continueOnCapturedContext` parameter, where `false` is the default. But at least it is configurable. – Theodor Zoulias Feb 16 '21 at 05:12