I want to do calculations with an outer and an inner loop which I can do in parallel. Furthermore, I want to use the async/await-based programming model. In the outer loop there is a place where a resource is needed which can only be used by one thread.
I thought of implementing the loops using ForEachAsync
and restrict the access to the resource using SemaphoreSlim
:
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class Program {
private static Dictionary<int,IReadOnlyList<int>> resource = new();
private static SemaphoreSlim semaphore = new(1);
public static async Task Main() {
var outerLoopSource = Enumerable.Range(0,10);
await Parallel.ForEachAsync(outerLoopSource, OuterLoopFunction);
foreach(var (key, list) in resource)
Console.WriteLine(key+": "+string.Join(',', list));
}
public static async ValueTask OuterLoopFunction(int i, CancellationToken cancel) {
// some time consuming calculation ...
var key = i%3;
const int listSize = 10;
IReadOnlyList<int> list;
await semaphore.WaitAsync();
try {
if(!resource.TryGetValue(key, out list)) {
var newList = new int[listSize];
list = newList;
resource.Add(key, list);
await Parallel.ForEachAsync(Enumerable.Range(0,listSize), InnerLoopFunction);
ValueTask InnerLoopFunction(int j, CancellationToken cancel) {
// some time consuming calculation ...
newList[j] = 42+i;
return ValueTask.CompletedTask;
}
}
} finally {
semaphore.Release();
}
// do something with list
}
}
Can ForEachAsync
be used in nested loops like this and is the number of operations in parallel still restricted by System.Environment.ProcessorCount
?
Update
In the comments people suggested to use dataflow components from the Task Parallel Library. That might be the better approach if I wrote the code from scratch. However, in my case there is quite a lot of legacy code doing the calculations and it seems to me I would have to restructure it significantly in order to apply the concept, since I would have to lift what is currently the inner loop on the same level as the outer loop. Hence I wonder if using annother SemaphoreSlim to restrict the number of parallel executions as described here avoids running to many tasks/threads in parallel without too much performance penalty.