1

I have a list of events and I'd like to fetch the performances for each event. Now I want to have a max of 10 'agents' that run in parallel and not more.

This is what the code would look like, except I want to make sure that these are not run all in parallel. There might be 1000s of elements in allEvents. I'd like to limit it to 10 in parallel at max.

    public async Task RefreshAll(Event[] allEvents)
    {
        var allPerformances = (await Task.WhenAll(allEvents.Select(CollectPerformancesForEvent))).SelectMany(x => x);
        // persist allPerformances
    }
    private async Task<Performance[]> CollectPerformancesForEvent(Event @event)
    {
        // some API call collecting all performances for @event...
        await Task.Delay(500);
        return new[]{ new Performance() };
    }
Sam7
  • 3,382
  • 2
  • 34
  • 57
  • I've tried the solution described in the other question, they don't really apply here. This questions is asking to delay the creation of tasks until they can actually be fulfilled. – Sam7 Aug 16 '19 at 05:56
  • Related: [How to limit the amount of concurrent async I/O operations?](https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations) – Theodor Zoulias Aug 16 '19 at 12:31

1 Answers1

2

Here is an extension method that will do exactly that

public static async Task<T[]> RunParallel<T>(this IEnumerable<Task<T>> tasks, int maxDegreeOfParallelism)
{
    var enumerationLock = new object();
    var parallelTasks = new List<Task>(maxDegreeOfParallelism);
    var results = new ConcurrentBag<T>();

    using (var enumerator = tasks.GetEnumerator())
    {
        // spin up just a few 'agents' to process the tasks
        await Task.WhenAll(Enumerable
            .Range(0, maxDegreeOfParallelism)
            .Select(_ => Task.Run(async () =>
            {
                Task<T> task;
                do
                {
                    // we need to make sure threads 'de-queue' without interferance
                    lock (enumerationLock)
                    {
                        // pick next element if available
                        if (!enumerator.MoveNext())
                        {
                            return;
                        }
                        task = enumerator.Current;
                    }

                    // wait for task to finish and add aggregate results
                    results.Add(await task);
                } while (task != null);
            }))
        );
        // wait until all 'agents' are finished
        await Task.WhenAll(parallelTasks);
    }
    return results.ToArray();
}

Then you can change the call by simply using RunParallel and define the degree of parallelism:

var allPerformances = (await allEvents.Select(CollectPerformancesForEvent).RunParallel(10)).SelectMany(x => x);
Sam7
  • 3,382
  • 2
  • 34
  • 57
  • Your approach is good, but the implementation is missing some features. On error is not failing fast, and the results are not in the same order as the tasks. [Here](https://stackoverflow.com/a/56862796/11178549) is a more complete implementation. – Theodor Zoulias Aug 16 '19 at 12:49