So it is desirable that all documents are parallelized like they are belonging in a single sequence, that is going to be processed with a single configurable degree of parallelism from start to finish.
I generally recommend using the highest-level primitive that is possible. In your case, since you have heterogeneous actions with different result types, and you also want a single degree of concurrency, this does restrict your options.
PLINQ is an option, though you'd need to merge the input and result types. Something like:
int[] pdfIDs = new[] { 1, 2, 3 };
int[] xlsIDs = new[] { 11, 12, 13, 14 };
int[] docIDs = new[] { 21, 22 };
var inputs = pdfIDs.Select(id => (Type: "pdf", Id: id))
.Concat(xlsIDs.Select(id => (Type: "xls", Id: id)))
.Concat(docIDs.Select(id => (Type: "doc", Id: id)));
var process = inputs.AsParallel()
.WithDegreeOfParallelism(3)
.Select(x =>
{
switch (x.Type)
{
case "pdf": return (x.Type, File: (object) CreatePdfFile(x.Id));
case "xls": return (x.Type, File: (object) CreateXlsFile(x.Id));
case "doc": return (x.Type, File: (object) CreateDocFile(x.Id));
default: throw new InvalidOperationException($"Unknown type {x.Type}");
}
});
var results = process.ToList();
PdfFile[] pdfFiles = results.Where(x => x.Type == "pdf").Select(x => (PdfFile) x.File).ToArray();
XlsFile[] xlsFiles = results.Where(x => x.Type == "xls").Select(x => (XlsFile)x.File).ToArray();
DocFile[] odsFiles = results.Where(x => x.Type == "doc").Select(x => (DocFile)x.File).ToArray();
Or something like that with better type safety and fewer magic strings. An enum and Choice
with some switch expressions would make this nicer. :)
Alternatively, Parallel
would work well. In this case, maybe a Parallel.Invoke
where the individual actions are responsible for storing their own results in a thread-safe collection:
int[] pdfIDs = new[] { 1, 2, 3 };
int[] xlsIDs = new[] { 11, 12, 13, 14 };
int[] docIDs = new[] { 21, 22 };
var pdfFileResults = new ConcurrentDictionary<int, PdfFile>();
var xlsFileResults = new ConcurrentDictionary<int, XlsFile>();
var docFileResults = new ConcurrentDictionary<int, DocFile>();
var pdfActions = pdfIDs.Select(id => (Action) (() => pdfFileResults.TryAdd(id, CreatePdfFile(id))));
var xlsActions = xlsIDs.Select(id => (Action) (() => xlsFileResults.TryAdd(id, CreateXlsFile(id))));
var docActions = docIDs.Select(id => (Action) (() => docFileResults.TryAdd(id, CreateDocFile(id))));
Parallel.Invoke(new ParallelOptions { MaxDegreeOfParallelism = 3 },
pdfActions.Concat(xlsActions).Concat(docActions).ToArray());
PdfFile[] pdfFiles = pdfFileResults.Values.ToArray();
XlsFile[] xlsFiles = xlsFileResults.Values.ToArray();
DocFile[] odsFiles = docFileResults.Values.ToArray();
The PLINQ approach - due to its partitioning - tends to divide up the work between different file types. The Parallel.Invoke
approach tends to work its way down the actions array one block at a time. Not sure which you would prefer.
Finally, there's the task-based parallelism approach. I don't generally recommend this due to its complexity; its real use case is in scenarios where each task can create more tasks, not in scenarios like this where the total number of tasks are known in advance. So I don't recommend this one, but it's interesting for completeness:
int[] pdfIDs = new[] { 1, 2, 3 };
int[] xlsIDs = new[] { 11, 12, 13, 14 };
int[] docIDs = new[] { 21, 22 };
var scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 3).ConcurrentScheduler;
var factory = new TaskFactory(scheduler);
var pdfTasks = pdfIDs.Select(id => factory.StartNew(() => CreatePdfFile(id))).ToList();
var xlsTasks = xlsIDs.Select(id => factory.StartNew(() => CreateXlsFile(id))).ToList();
var docTasks = docIDs.Select(id => factory.StartNew(() => CreateDocFile(id))).ToList();
Task.WaitAll(pdfTasks.Cast<Task>().Concat(xlsTasks).Concat(docTasks).ToArray());
PdfFile[] pdfFiles = pdfTasks.Select(x => x.Result).ToArray();
XlsFile[] xlsFiles = xlsTasks.Select(x => x.Result).ToArray();
DocFile[] odsFiles = docTasks.Select(x => x.Result).ToArray();
Since these are all synchronous tasks, I would use ConcurrentExclusiveSchedulerPair.ConcurrentScheduler
rather than SemaphoreSlim
. This is the normal pattern for throttling task-based parallel code.
The task-based parallel approach has similar execution as the Parallel.Invoke
approach; since all the tasks are queued to the scheduler in groups by type, that's how they tend to run.
As a final note, I do have to put in a plug for my book; I honestly think you would enjoy it. My blog focuses on asynchrony; my book covers parallelism as well.