1

I want to do calculations with an outer and an inner loop which I can do in parallel. Furthermore, I want to use the async/await-based programming model. In the outer loop there is a place where a resource is needed which can only be used by one thread.

I thought of implementing the loops using ForEachAsync and restrict the access to the resource using SemaphoreSlim:

using System.Linq;
using System.Threading;
using System.Threading.Tasks;
                    
public class Program {
    private static Dictionary<int,IReadOnlyList<int>> resource = new();
    private static SemaphoreSlim semaphore = new(1);
    
    public static async Task Main() {       
        var outerLoopSource = Enumerable.Range(0,10);
        await Parallel.ForEachAsync(outerLoopSource, OuterLoopFunction);
        foreach(var (key, list) in resource)
            Console.WriteLine(key+": "+string.Join(',', list));
    }
                                    
    public static async ValueTask OuterLoopFunction(int i, CancellationToken cancel) {
        // some time consuming calculation ...      
        var key = i%3;
        const int listSize = 10;
        IReadOnlyList<int> list;
        await semaphore.WaitAsync();
        try {
            if(!resource.TryGetValue(key, out list)) {
                var newList = new int[listSize];
                list = newList;
                resource.Add(key, list);
                await Parallel.ForEachAsync(Enumerable.Range(0,listSize), InnerLoopFunction);
                ValueTask InnerLoopFunction(int j, CancellationToken cancel) {
                    // some time consuming calculation ...
                    newList[j] = 42+i;
                    return ValueTask.CompletedTask;
                }
            }
        } finally {
            semaphore.Release();
        }           
        // do something with list
    }   
}

Example in fiddle

Can ForEachAsync be used in nested loops like this and is the number of operations in parallel still restricted by System.Environment.ProcessorCount?

Update

In the comments people suggested to use dataflow components from the Task Parallel Library. That might be the better approach if I wrote the code from scratch. However, in my case there is quite a lot of legacy code doing the calculations and it seems to me I would have to restructure it significantly in order to apply the concept, since I would have to lift what is currently the inner loop on the same level as the outer loop. Hence I wonder if using annother SemaphoreSlim to restrict the number of parallel executions as described here avoids running to many tasks/threads in parallel without too much performance penalty.

MarkusParker
  • 1,264
  • 2
  • 17
  • 35
  • Use an `ActionBlock` instead. Instead of looping, create a pipeline of steps using eg TransformBlock instances. What does your code actually do, once you remove the attempts to parallelize/restrict threads? – Panagiotis Kanavos Mar 17 '22 at 13:24
  • `Parallel.ForEachAsync` is meant to execute asynchronous operations concurrently with data that comes either from an `IEnumerable` or `IAsyncEnumerable`. There's no attempt to partition or batch like `Parallel.ForEach`. The default is to use the number of cores but that can usually be increased because asynchronous operations are usually just waiting for IO to complete. – Panagiotis Kanavos Mar 17 '22 at 13:28
  • On the other hand, ActionBlock and the other classes in TPL Dataflow are used to create a pipeline of blocks, similar to a script pipeline. Each block processes an input message and passes the result to the next block down the line. Most blocks have input/output buffers. By default one task is used per block, but that can be changed. This means you can use a TransformBlock to crawl web pages and emit URLs using eg 1 task, and have another block download and process those URLs using 10 tasks. – Panagiotis Kanavos Mar 17 '22 at 13:33
  • Yes, I can be nested, and will be restricted by your threadpool but about if it will perform better or not is quite complicated to say. It depends of how compute-intensive your "//some time consuming calculation" is. Check my answer in this thread https://stackoverflow.com/questions/71434976/why-isnt-parallel-for-fast-with-heap-intensive-operations/71452625#71452625 And this Microsoft Documentation about parallelism https://learn.microsoft.com/pt-br/dotnet/standard/parallel-programming/potential-pitfalls-in-data-and-task-parallelism – Douglas Ferreira Mar 17 '22 at 13:42
  • I would reconsider your design. Using async/await is not really great fit for compute-bound operations. I'm also skeptical of holding locks while doing compute bound operations, as far as I can tell, you should be able get rid of the lock, replacing it with a concurrentDictionary. I would either use something like Dataflow, or possibly as two sequential parallel loops. – JonasH Mar 17 '22 at 13:59
  • Does ConcurrentDictionary implement locking more efficient? – MarkusParker Mar 17 '22 at 15:06
  • 2
    Are you sure that you are not falling into the trap of [Over parallelism](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/potential-pitfalls-in-data-and-task-parallelism#avoid-over-parallelization)? Have you performed measurements when both are parallel and when only the outer one? – Peter Csala Mar 18 '22 at 08:32
  • @PeterCsala: In the original legacy code, just the inner loop was done in parallel using Parallel.ForEach. I do not want to omit this, since the outer loop may process only one element and the user might experience a performance loss in that case. – MarkusParker Mar 20 '22 at 14:32

1 Answers1

1

No, the ParallelOptions.MaxDegreeOfParallelism affects only the degree of parallelism of the configured Parallel.ForEachAsync loop. It's not an ambient property that affects all other parallel loops that might be nested inside the outer parallel loop. For example if you configure the outer parallel loop with MaxDegreeOfParallelism = 5, and the inner parallel loop with MaxDegreeOfParallelism = 3, the delegate of the inner parallel loop might be invoked concurrently 15 times (5 * 3) at any given moment.

This assuming that the inner parallel loop is unrestricted. In your example you have enclosed the inner parallel loop in a protected region, by using a SemaphoreSlim(1). So only one inner parallel loop can be active at any given moment. The maximum number of concurrent invocations of the inner loop's delegate is Environment.ProcessorCount (the default MaxDegreeOfParallelism for the Parallel.ForEachAsync API).

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104