@i3arnon's answer is correct. Use TPL Dataflow.
The rest of this answer is for educational purposes and/or special use cases only.
I've recently bumped into a similar problem in a project where I could not introduce any external dependencies, so I had to roll my own load-balancing implementation, and it turned out being surprisingly simple (until you start wiring in cancellation and ordered results - but that's outside the scope of this question).
I am disregarding the "10 dedicated threads" requirement since, as others have already explained, it does not make sense when dealing with async operations. Instead I will maintain up to N
concurrent Task
instances processing the work load.
static async Task InvokeAsync(IEnumerable<Func<Task>> taskFactories, int maxDegreeOfParallelism)
{
Queue<Func<Task>> queue = new Queue<Func<Task>>(taskFactories);
if (queue.Count == 0) {
return;
}
List<Task> tasksInFlight = new List<Task>(maxDegreeOfParallelism);
do
{
while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0)
{
Func<Task> taskFactory = queue.Dequeue();
tasksInFlight.Add(taskFactory());
}
Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
// Propagate exceptions. In-flight tasks will be abandoned if this throws.
await completedTask.ConfigureAwait(false);
tasksInFlight.Remove(completedTask);
}
while (queue.Count != 0 || tasksInFlight.Count != 0);
}
Usage:
Func<Task>[] taskFactories = {
() => _repository.WriteData(someData1),
() => _repository.WriteData(someData2),
() => _repository.WriteData(someData3),
() => _repository.WriteData(someData4)
};
await InvokeAsync(taskFactories, maxDegreeOfParallelism: 2);
... or
IEnumerable<SomeData> someDataCollection = ... // Get data.
await ParallelTasks.InvokeAsync(
someDataCollection.Select(someData => new Func<Task>(() => _repository.WriteData(someData))),
maxDegreeOfParallelism: 10
);
This solution does not suffer from the poor load balancing problem which is often seen in other trivial implementations in cases where tasks have varying durations and the input is pre-partitioned (such as this one).
Version with perf optimizations and argument validation: Gist.