-2

I want to create some documents that need some quite intensive CPU calculations for their creation. The documents are heterogeneous, some are PDF, some are Excel, and some are Word documents. My input is three sequences of IDs, for example:

// Input
int[] pdfIDs = new[] { 1, 2, 3 };
int[] xlsIDs = new[] { 11, 12, 13, 14 };
int[] docIDs = new[] { 21, 22 };

I already have methods that create each of these document types. Here is a simplified version of these methods:

PdfFile CreatePdfFile(int id)
{
    Thread.Sleep(1000); // Simulate some heavy calculation
    return new PdfFile();
}
XlsFile CreateXlsFile(int id)
{
    Thread.Sleep(1500); // Simulate some heavy calculation
    return new XlsFile();
}
DocFile CreateDocFile(int id)
{
    Thread.Sleep(2000); // Simulate some heavy calculation
    return new DocFile();
}

class PdfFile { public byte[] Bytes { get; set; } }
class XlsFile { public byte[] Bytes { get; set; } }
class DocFile { public byte[] Bytes { get; set; } }

These methods must be called in parallel, otherwise the creation of the documents takes too long. I could use PLINQ to parallelize the creation of each type separatelly, but this would be inefficient because the degree of parallelism would drop between finishing one type and starting the next. So it is desirable that all documents are parallelized like they are belonging in a single sequence, that is going to be processed with a single configurable degree of parallelism from start to finish. The desirable outcome is three arrays containing the created documents:

// Output
PdfFile[] pdfFiles;
XlsFile[] xlsFiles;
DocFile[] docFiles;

The whole process need not to be asynchronous. It is OK to block the current thread until all documents have been created.

How can I achieve this goal?

Btw there is a related question with good answers here: Awaiting multiple Tasks with different results, but that question deals with a quite simpler scenario (no lists, no requirement for a specific degree of parallelism). So these answers cannot be used for solving this more complex problem.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104

3 Answers3

3

So it is desirable that all documents are parallelized like they are belonging in a single sequence, that is going to be processed with a single configurable degree of parallelism from start to finish.

I generally recommend using the highest-level primitive that is possible. In your case, since you have heterogeneous actions with different result types, and you also want a single degree of concurrency, this does restrict your options.

PLINQ is an option, though you'd need to merge the input and result types. Something like:

int[] pdfIDs = new[] { 1, 2, 3 };
int[] xlsIDs = new[] { 11, 12, 13, 14 };
int[] docIDs = new[] { 21, 22 };

var inputs = pdfIDs.Select(id => (Type: "pdf", Id: id))
    .Concat(xlsIDs.Select(id => (Type: "xls", Id: id)))
    .Concat(docIDs.Select(id => (Type: "doc", Id: id)));
var process = inputs.AsParallel()
    .WithDegreeOfParallelism(3)
    .Select(x =>
    {
        switch (x.Type)
        {
            case "pdf": return (x.Type, File: (object) CreatePdfFile(x.Id));
            case "xls": return (x.Type, File: (object) CreateXlsFile(x.Id));
            case "doc": return (x.Type, File: (object) CreateDocFile(x.Id));
            default: throw new InvalidOperationException($"Unknown type {x.Type}");
        }
    });
var results = process.ToList();

PdfFile[] pdfFiles = results.Where(x => x.Type == "pdf").Select(x => (PdfFile) x.File).ToArray();
XlsFile[] xlsFiles = results.Where(x => x.Type == "xls").Select(x => (XlsFile)x.File).ToArray();
DocFile[] odsFiles = results.Where(x => x.Type == "doc").Select(x => (DocFile)x.File).ToArray();

Or something like that with better type safety and fewer magic strings. An enum and Choice with some switch expressions would make this nicer. :)

Alternatively, Parallel would work well. In this case, maybe a Parallel.Invoke where the individual actions are responsible for storing their own results in a thread-safe collection:

int[] pdfIDs = new[] { 1, 2, 3 };
int[] xlsIDs = new[] { 11, 12, 13, 14 };
int[] docIDs = new[] { 21, 22 };

var pdfFileResults = new ConcurrentDictionary<int, PdfFile>();
var xlsFileResults = new ConcurrentDictionary<int, XlsFile>();
var docFileResults = new ConcurrentDictionary<int, DocFile>();

var pdfActions = pdfIDs.Select(id => (Action) (() => pdfFileResults.TryAdd(id, CreatePdfFile(id))));
var xlsActions = xlsIDs.Select(id => (Action) (() => xlsFileResults.TryAdd(id, CreateXlsFile(id))));
var docActions = docIDs.Select(id => (Action) (() => docFileResults.TryAdd(id, CreateDocFile(id))));

Parallel.Invoke(new ParallelOptions { MaxDegreeOfParallelism = 3 },
    pdfActions.Concat(xlsActions).Concat(docActions).ToArray());

PdfFile[] pdfFiles = pdfFileResults.Values.ToArray();
XlsFile[] xlsFiles = xlsFileResults.Values.ToArray();
DocFile[] odsFiles = docFileResults.Values.ToArray();

The PLINQ approach - due to its partitioning - tends to divide up the work between different file types. The Parallel.Invoke approach tends to work its way down the actions array one block at a time. Not sure which you would prefer.

Finally, there's the task-based parallelism approach. I don't generally recommend this due to its complexity; its real use case is in scenarios where each task can create more tasks, not in scenarios like this where the total number of tasks are known in advance. So I don't recommend this one, but it's interesting for completeness:

int[] pdfIDs = new[] { 1, 2, 3 };
int[] xlsIDs = new[] { 11, 12, 13, 14 };
int[] docIDs = new[] { 21, 22 };

var scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 3).ConcurrentScheduler;
var factory = new TaskFactory(scheduler);

var pdfTasks = pdfIDs.Select(id => factory.StartNew(() => CreatePdfFile(id))).ToList();
var xlsTasks = xlsIDs.Select(id => factory.StartNew(() => CreateXlsFile(id))).ToList();
var docTasks = docIDs.Select(id => factory.StartNew(() => CreateDocFile(id))).ToList();

Task.WaitAll(pdfTasks.Cast<Task>().Concat(xlsTasks).Concat(docTasks).ToArray());

PdfFile[] pdfFiles = pdfTasks.Select(x => x.Result).ToArray();
XlsFile[] xlsFiles = xlsTasks.Select(x => x.Result).ToArray();
DocFile[] odsFiles = docTasks.Select(x => x.Result).ToArray();

Since these are all synchronous tasks, I would use ConcurrentExclusiveSchedulerPair.ConcurrentScheduler rather than SemaphoreSlim. This is the normal pattern for throttling task-based parallel code.

The task-based parallel approach has similar execution as the Parallel.Invoke approach; since all the tasks are queued to the scheduler in groups by type, that's how they tend to run.

As a final note, I do have to put in a plug for my book; I honestly think you would enjoy it. My blog focuses on asynchrony; my book covers parallelism as well.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 1
    Thanks Stephen for the answer. I am spoiled for choice now! All three solutions work great for my case. The first one (using `AsParallel`) is a bit inflexible because I couldn't use it if I had heterogeneous input. For example if I had `Guid`s as input for the PDF files, and integers for the rest. The second option (using `Parallel.Invoke`) has the advantage that the current thread becomes one of the worker threads, instead of just been blocked like in the third option. But I love the third option, because it gives the one-in-life opportunity to use the `ConcurrentExclusiveSchedulerPair`! – Theodor Zoulias Mar 28 '20 at 04:56
2

If your method fully sync - there will be no drops in between, task will take thread from pool, executes sync part to completion and return thread to pool, thats all. No breaks. It will be solely for your sync part entire time (switches can happen only on awaits). To code some kind of parallel degree I usually use semaphore:

class Test
{
    public PdfFile CreatePdfFile(int id)
    {
        Work(1000); // Simulate some heavy calculation
        return new PdfFile();
    }
    public XlsFile CreateXlsFile(int id)
    {
        Work(1500); // Simulate some heavy calculation
        return new XlsFile();
    }
    public DocFile CreateDocFile(int id)
    {
        Work(2000); // Simulate some heavy calculation
        return new DocFile();
    }

    private void Work(int miliseconds)
    {
        var step = 100;
        while (miliseconds > 0)
        {
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(step);
            miliseconds -= step;
        }
    }

    public class PdfFile { public byte[] Bytes { get; set; } }
    public class XlsFile { public byte[] Bytes { get; set; } }
    public class DocFile { public byte[] Bytes { get; set; } }
}
class Program
{
    static void Main(string[] args)
    {
        var cts = new CancellationTokenSource();
        var degreeOfParallelism = 1;
        var itemCount = 10;

        var test = new Test();
        var sema = new SemaphoreSlim(degreeOfParallelism);

        var tasks = Enumerable.Range(0, itemCount).Select(x =>
         {
             return Task.Run(async () =>
             {
                 await sema.WaitAsync(cts.Token);
                 try
                 {   //<---- here your "exclusive thread" starts
                     if (x % 3 == 0)
                     {
                         test.CreateDocFile(x);
                     }
                     else if (x % 3 == 1)
                     {
                         test.CreatePdfFile(x);
                     }
                     else if (x % 3 == 2)
                     {
                         test.CreateXlsFile(x);
                     }
                 }
                 finally
                 {   //<---- here your "exclusive thread" ends.
                     sema.Release();
                 }
             }, cts.Token);
         }).ToArray();

        Task.WaitAll(tasks);
        Console.WriteLine("Done");
        Console.ReadKey();
    }
}

It is not as clear as creating your own thread pool, but behaves in same manner.

If you move semaphore in Select method, you will get more clear representation, because it will not enqueue tasks in pool, but block until it can.

eocron
  • 6,885
  • 1
  • 21
  • 50
  • Thanks eocron for the answer! Adding a `CancellationToken` is a quite nice feature. What is missing from your solution is the outcome: the three lists with the created documents. – Theodor Zoulias Mar 27 '20 at 20:53
  • 1
    I think it is not so hard to figure it out by yourself =) Just wrap behind interface and you are good to return whatever those methods spew out. – eocron Mar 27 '20 at 21:41
2

I would like to add one more solution to the problem. It is not particularly enticing since it makes use of the anachronistic Task constructor, but it offers a sometimes desirable feature: fails fast in case of an exception. The degree of parallelism is enforced by a Parallel.ForEachAsync loop that starts and awaits each task.

var pdfTasks = pdfIDs.Select(id => new Task<PdfFile>(() => CreatePdfFile(id))).ToArray();
var xlsTasks = xlsIDs.Select(id => new Task<XlsFile>(() => CreateXlsFile(id))).ToArray();
var docTasks = docIDs.Select(id => new Task<DocFile>(() => CreateDocFile(id))).ToArray();

var allTasks = Enumerable.Empty<Task>()
    .Concat(pdfTasks)
    .Concat(xlsTasks)
    .Concat(docTasks);

var parallelOptions = new ParallelOptions()
{
    MaxDegreeOfParallelism = 3
};
Parallel.ForEachAsync(allTasks, parallelOptions, async (task, _) =>
{
    task.Start();
    await task;
}).Wait();

PdfFile[] pdfFiles = pdfTasks.Select(t => t.Result).ToArray();
XlsFile[] xlsFiles = xlsTasks.Select(t => t.Result).ToArray();
DocFile[] docFiles = docTasks.Select(t => t.Result).ToArray();
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104