The threads of Parallel.Foreach
are "real" background threads. They are created and the application continues execution. The point is that Parallel.Foreach
is not awaitable, therefore the execution continues while the threads of the Parallel.Foreach
are suspended using await
.
private async Task DoSomething()
{
List<int> numbers = new List<int>(Enumerable.Range(1, 5));
// Create the threads of Parallel.Foreach
await Task.Run(() =>
{
// Create the threads of Parallel.Foreach and continue
Parallel.ForEach(numbers,async i =>
{
// await suspends the thread and forces to return.
// Because Parallel.ForEach is not awaitable,
// execution leaves the scope of the Parallel.Foreach to continue.
await Task.Delay(i * 300);
await Dispatcher.BeginInvoke(() => tbResults.Text += i.ToString() + Environment.NewLine);
});
// After the threads are created the internal await of the Parallel.Foreach suspends background threads and
// forces to the execution to return from the Parallel.Foreach.
// The Task.Run thread continues.
Dispatcher.InvokeAsync(() => tbResults.Text += "Text while Parallel.Foreach threads are suspended");
// Since the background threads of the Parallel.Foreach are not attached
// to the parent Task.Run, the Task.Run completes now and returns
// i.e. Task.run does not wait for child background threads to complete.
// ==> Leave Task.Run as there is no work.
});
// Leave DoSomething() and continue to execute the remaining code in the Button_Click().
// Parallel.Foreach threads is still suspended until the await chain, in this case Button_Click(), is completed.
}
The solution is to implement the pattern suggested by Clemens' comment or an async implementation of the Producer Consumer pattern using e.g., BlockingCollection
or Channel
to gain more control over the fixed number of threads while distributing the "unlimited" number of jobs.
private async Task DoSomething(int number)
{
await Task.Delay(number * 300);
Dispatcher.Invoke(() => tbResults.Text += number + Environment.NewLine);
}
private async void ButtonBase_OnClick(object sender, RoutedEventArgs e)
{
List<int> numbers = new List<int>(Enumerable.Range(1, 5));
List<Task> tasks = new List<Task>();
// Alternatively use LINQ Select
foreach (int number in numbers)
{
Task task = DoSomething(number);
tasks.Add(task);
}
await Task.WhenAll(tasks);
tbResults.Text += "This is end text" + Environment.NewLine;
}
Discussing the comments
"my intention was to run tasks in parallel and "report" once they are
completed i.e. the taks which takes the shortest would "report" first
and so on."
This is exactly what is happening in the above solution. The Task
with the shortest delay appends text to the TextBox
first.
"But implementing your suggested await Task.WhenAll(tasks)
causes
that we need to wait for all tasks to complete and then report all at
once."
To process Task
objects in their order of completion, you would replace Task.WhenAll
with Task.WhenAny
. In case you are not only interested in the first completed Task
, you would have to use Task.WhenAny
in an iterative manner until all Task
instances have been completed:
Process all Task objects in their order of completion
private async Task DoSomething(int number)
{
await Task.Delay(number * 300);
Dispatcher.Invoke(() => tbResults.Text += number + Environment.NewLine);
}
private async void ButtonBase_OnClick(object sender, RoutedEventArgs e)
{
List<int> numbers = new List<int>(Enumerable.Range(1, 5));
List<Task> tasks = new List<Task>();
// Alternatively use LINQ Select
foreach (int number in numbers)
{
Task task = DoSomething(number);
tasks.Add(task);
}
// Until all Tasks have completed
while (tasks.Any())
{
Task<int> nextCompletedTask = await Task.WhenAny(tasks);
// Remove the completed Task so that
// we can await the next uncompleted Task that completes first
tasks.Remove(nextCompletedTask);
// Get the result of the completed Task
int taskId = await nextCompletedTask;
tbResults.Text += $"Task {taskId} has completed." + Environment.NewLine;
}
tbResults.Text += "This is end text" + Environment.NewLine;
}
"Parallel.ForEach
is not awaitable so I thought that wrapping it up in
Task.Run
allows me to await it but this is because as you said "Since
the background threads of the Parallel.Foreach
are not attached to the
parent Task.Run
""
No that's not exactly what I have said. The key point is the third sentence of my answer: "The point is that Parallel.Foreach
is not awaitable, therefore the execution continues while the threads of the Parallel.Foreach
are suspended using await.".
This means: normally Parallel.Foreach
executes synchronously: the calling context continues execution when all threads of Parallel.Foreach
have completed. But since you called await
inside those threads, you suspend them in an async/await manner.
Since Parallel.Foreach
is not awaitable, it can't handle the await
calls and acts like the suspended threads have completed naturally. Parallel.Foreach
does not understand that the threads are just suspended by await
and will continue later. In other words, the await chain is broken as Parallel.Foreach
is not able to return the Task
to the parent awaited Task.Run
context to signal its suspension.
That's what I meant when saying that the threads of Parallel.Foreach
are not attached to the Task.Run
. They run in total isolation from the async/await infrastructure.
"async lambdas should be "only use with events""
No, that's not correct. When you pass an async lambda to a void
delegate like Action<T>
you are correct: the async lambda can't be awaited in this case. But when passing an async lambda to a Func<T>
delegate where T
is of type Task
, your lamda can be awaited:
private void NoAsyncDelegateSupportedMethod(Action lambda)
{
// Since Action does not return a Task (return type is always void),
// the async lambda can't be awaited
lambda.Invoke();
}
private async Task AsyncDelegateSupportedMethod(Func<Task> asyncLambda)
{
// Since Func returns a Task, the async lambda can be awaited
await asyncLambda.Invoke();
}
public voi DoSoemthing()
{
// Not a good idea as NoAsyncDelegateSupportedMethod can't handle async lamdas: it defines a void delegate
NoAsyncDelegateSupportedMethod(async () => await Task.Delay(1));
// A good idea as AsyncDelegateSupportedMethod can handle async lamdas: it defines a Func<Task> delegate
AsyncDelegateSupportedMethod(async () => await Task.Delay(1));
}
As you can see your statement is not correct. You must always check the signature of the called method and its overloads. If it accepts a Func<Task>
type delegate you are good to go.
That's how async support is added to Parallel.ForeachAsync
: the API supports a Func<ValueTask>
type delegate. For example Task.Run
accepts a Func<Task>
and therefore the following call is perfectly fine:
Task.Run(async () => await Task.Delay(1));
" I guess that you admit that .Net 6.0 brought the best solution :
which is Parallel.ForEachASYNC! [...] We can spawn a couple of threads
which deal with our tasks in parallel and we can await the whole loop
and we do not need to wait for all tasks to complte- they "report" as
they finish "
That's wrong. Parallel.ForeachAsync
supports threads thatr use async/await, that's true. Indeed, your original example would no longer break the intended flow: because Parallel.ForeachAsync
supports await
in its threads, it can handle suspended threads and propagate the Task
object properly from its threads to the caller context e.g., to the wrapping await Task.Run
.
It now knows how to wait for and resume suspended threads.
Important: Parallel.ForeachAsync
still completes AFTER ALL its threads have completed. You assumption "they "report" as they finish" is wrong. That's the most intuitive concurrent implementation of a foreach
. foreach
also completes after all items are enumerated.
The solution to process Task
objects as they complete is still using the Task.WhenAny
pattern from above.
In general, if you you don't need the extra features like partitioning etc. of the the Parallel.Foreach
and Parallel.ForeachAsync
, you can always use Task.WhenAll
instead. Task.WhenAll
and especially Parallel.ForeachAsync
are equivalent, except for Parallel.ForeachAsync
provides greater customization by default: it suppors techniques like throttling and partitioning without the extra code.