Here is one idea that involves creating an extension method for TaskFactory
.
public static class TaskFactoryExtension
{
public static Task StartNew(this TaskFactory target, Action action, int parallelism)
{
var tasks = new Task[parallelism];
for (int i = 0; i < parallelism; i++)
{
tasks[i] = target.StartNew(action);
}
return target.StartNew(() => Task.WaitAll(tasks));
}
}
Then your calling code would look like the following.
ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
var task = Task.Factory.StartNew(
() =>
{
T item;
while (queue.TryDequeue(out item))
{
ProcessItem(item);
}
}, n);
task.Wait(); // Optionally wait for everything to finish.
Here is another idea using Parallel.ForEach
. The problem with this approach is that your degrees of parallelism might not necessarily be honored. You are only indicating the maximum amount allowed and not the absolute amount.
ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n },
(item) =>
{
ProcessItem(item);
});