1

I need to process a List<T> of thousands of elements.

First I need to group the elements by year and type, so I obtain a List<List<T>>. Then for each internal List<T> I want to add objects of type T until the max package size is reached for the List<T>, then I create a new package and go on the same way.

I want to use Parallel.ForEach loop.

My actual implementation works well if I run it sequentially, but the logic is not Thread Safe and I want to change it.
I think the problem is on the inner Parallel.ForEach loop, when the max size for the List<T> is reached and I instantiate a new List<T> inside the same reference.

private ConcurrentBag<ConcurrentBag<DumpDocument>> InitializePackages()
{
    // Group by Type and Year
    ConcurrentBag<ConcurrentBag<DumpDocument>> groups = new ConcurrentBag<ConcurrentBag<DumpDocument>>(Dump.DumpDocuments.GroupBy(d => new { d.Type, d.Year })
        .Select(g => new ConcurrentBag<DumpDocument> (g.ToList()))
        .ToList());

    // Documents lists with max package dimension
    ConcurrentBag<ConcurrentBag<DumpDocument>> documentGroups = new ConcurrentBag<ConcurrentBag<DumpDocument>>();

    foreach (ConcurrentBag<DumpDocument> group in groups)
    {       
        long currentPackageSize = 0;

        ConcurrentBag<DumpDocument> documentGroup = new ConcurrentBag<DumpDocument>();

        ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = Parameters.MaxDegreeOfParallelism };
        Parallel.ForEach(group, options, new Action<DumpDocument, ParallelLoopState>((DumpDocument document, ParallelLoopState state) =>
            {
                long currentDocumentSize = new FileInfo(document.FilePath).Length;

                // If MaxPackageSize = 0 then no splitting to apply and the process works well
                if (Parameters.MaxPackageSize > 0 && currentPackageSize + currentDocumentSize > Parameters.MaxPackageSize)
                {
                    documentGroups.Add(documentGroup);

                    // Here's the problem!
                    documentGroup = new ConcurrentBag<DumpDocument>();

                    currentPackageSize = 0;
                }

                documentGroup.Add(document);
                currentPackageSize += currentDocumentSize;
            }));

        if (documentGroup.Count > 0)
            documentGroups.Add(documentGroup);
    }

    return documentGroups;
}

public class DumpDocument
{
    public string Id { get; set; }
    public long Type { get; set; }
    public string MimeType { get; set; }
    public int Year { get; set; }
    public string FilePath { get; set; }
}

Since my operation is quite simple, actually I only need to get the file size using:

long currentDocumentSize = new FileInfo(document.FilePath).Length;

I read around that I can also use a Partitioner, but I've never used that and anyway it's not my priority at the moment.

I also already read this question that is similar but doesn't solve my problem with the inner loop.

UPDATE 28/12/2016

I updated the code to meet verification requirements.

Community
  • 1
  • 1
Cheshire Cat
  • 1,941
  • 6
  • 36
  • 69
  • Are you just trying to speed things up with parallelism? – Enigmativity Dec 06 '16 at 10:35
  • In this specific case, yes. I want to speed up the initialization of packages. Then each single package (ConcurrentBag) pass through a more complex Parallel ForEach loop that process the documents. – Cheshire Cat Dec 06 '16 at 10:40
  • It seems to me that you have an in-memory list. It will almost always be faster to process the data on one thread rather than in parallel. It's only if you have some heavy processing that it is worth doing anything in parallel. – Enigmativity Dec 06 '16 at 11:03
  • Ok. But what if I need to do something more complicated inside the loop? How can I make my code Thread Safe? That is my question... – Cheshire Cat Dec 06 '16 at 13:53
  • Sure. Can you modify your code so that it is a [mcve]? Then it can be answered. – Enigmativity Dec 06 '16 at 22:37
  • @Enigmativity Hi! A little bit late, but I updated to code by adding the referenced class `DumpDocument`. – Cheshire Cat Dec 28 '16 at 08:56

1 Answers1

2

After the code update it seems that you'are using the ConcurrentBag so the is another non-thread-safe logic left in your code:

long currentPackageSize = 0;
if (// .. && 
    currentPackageSize + currentDocumentSize > Parameters.MaxPackageSize
// ...
{
    // ...
    currentPackageSize += currentDocumentSize;
}

+= operator isn't atomic, and you'll definitely have a race condition there, and reading the value of a long variable isn't thread-safe here. You may introduce the locks there, or to use the Interlocked class to atomically update the value:

Interlocked.Add(ref currentPackageSize, currentDocumentSize);
Interlocked.Exchange(ref currentPackageSize, 0);
Interlocked.Read(ref currentPackageSize);

Using this class will lead for a some refactoring code (I think that usage of CAS operations such as CompareExchange is more preferable in your case), so, maybe for you it is easiest way to use the locks. You probably should implement both ways and test them and measure the execution time).

Also, as you can see, the instantiation isn't thread-safe too, so you had to either lock the variable (which will lead to thread synchronization pause) or refactor your code to two-steps: at first you get all the file sizes in parallel, after that you iterate over results in sequential manner, avoiding race conditions.

As for the Partitioner, you shouldn't use this class here, as it's usually being used to schedule the work across the CPU, not to split the results.

However, I'd like to note that you have some minor code issues:

  1. You can remove ToList() calls inside the constructors of the ConcurrentBag because it accepts the IEnumerable, which you already have:

    ConcurrentBag<ConcurrentBag<DumpDocument>> groups = new ConcurrentBag<ConcurrentBag<DumpDocument>>(Dump.DumpDocuments.GroupBy(d => new { d.Type, d.Year })
        .Select(g => new ConcurrentBag<DumpDocument> (g)));
    

    This will help you to avoid unnecessary copies of your grouped data

  2. You can use the var keyword to avoid the duplication of the types in your code (this is just a sample line, you can change it many times across your code):

    foreach (var group in groups)
    
  3. You should not use maximum degree of parallelism unless you're knowing what you're doing (and I think that you aren't):

    var options = new ParallelOptions { MaxDegreeOfParallelism = Parameters.MaxDegreeOfParallelism };
    

    TPL default task scheduler tries to adjust the thread pool and CPU usage for your tasks, so in general this number should be equal to Environment.ProcessorCount.

  4. You can use lambda syntax for the Parallel.ForEach, and do not create a new Action (you can also move out this code to a method routine):

    (document, state) =>
    {
        long currentDocumentSize = new FileInfo(document.FilePath).Length;
    
        // If MaxPackageSize = 0 then no splitting to apply and the process works well
        if (Parameters.MaxPackageSize > 0 && currentPackageSize + currentDocumentSize > Parameters.MaxPackageSize)
        {
            documentGroups.Add(documentGroup);
    
            // Here's the problem!
            documentGroup = new ConcurrentBag<DumpDocument>();
    
            currentPackageSize = 0;
        }
    
        documentGroup.Add(document);
        currentPackageSize += currentDocumentSize;
    }
    

    The lambda is correctly compiled because you already have a generic collection (a bag), and there is an overload which accepts the ParallelLoopState as a second parameter.

VMAtm
  • 27,943
  • 17
  • 79
  • 125
  • 1
    Thank you! I finally refactored the code in 2 steps. But I also appreciated your excursus of the Interlocked class. I used it on some other ParallelForEach blocks in my code, expecially for counters. – Cheshire Cat Dec 30 '16 at 09:13
  • You're welcome:) Yeah, for counters such increments are very helpful. Good luck with your projects. – VMAtm Dec 30 '16 at 12:34