1

Is it the correct way to use ConcurrentDictionary and ConcurrentBag to AddOrUpdate values.

Basically tried to do as follow,

  1. having file with millions of records and trying to process and extract to object.

  2. And entry is like, Key-Value pair, Key=WBAN and Value as object.

     var cd = new ConcurrentDictionary<String, ConcurrentBag<Data>>();
     int count = 0;
    
     foreach (var line in File.ReadLines(path).AsParallel().WithDegreeOfParallelism(5))
     {
         var sInfo = line.Split(new char[] { ',' });
         cd.AddOrUpdate(sInfo[0], new ConcurrentBag<Data>(){ new Data()
         {
             WBAN =  sInfo[0],
                 Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
                 time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
     }
         }
         ,
         (oldKey, oldValue) =>
         {
             oldValue.Add(new Data()
             {
                 WBAN = sInfo[0],
                 Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
                 time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
             });
    
             return oldValue;
         }
         );
     }
    
Cod29
  • 265
  • 4
  • 14
  • 1
    `File.ReadLines(path).AsParallel().WithDegreeOfParallelism(5)` - what is this meant to achieve? There is no point to making this parallel code because your program is IO-bound. You aren't even using async-IO either. You're creating needless complication for yourself. – Dai Sep 07 '20 at 03:29
  • There is no need to run this in parallel, it will achieve next to nothing apart from give you headaches, and likely run slower even if you make it work – TheGeneral Sep 07 '20 at 03:29
  • 1
    Your code is also creating a new `ConcurrentBag` instance unnecessarily because `ConcurrentDictionary` runs all factory callbacks even when there will be collision (it does this by default, though there are ways around this). – Dai Sep 07 '20 at 03:30
  • ACtually, text file has 20 million or more records. So, will it be useful on parallel processing ? – Cod29 Sep 07 '20 at 03:32
  • 1
    If you spent more time processing on the CPU, you might have a case, but the bottle-neck will be *IO*, there is nothing done on the CPU in this case (relatively). Additionally because you are reading end of lines, and its likely encoded, there is no easy way to parallel this (but not impossible). Lastly, i would think twice about pulling 2 million records of something into memory, that is a huge amount of unmovable data on your Large Object Heap. sounds like you want a database – TheGeneral Sep 07 '20 at 03:35
  • _"text file has 20 million or more records"_ - I think the bigger question is why a text file has 20 million+ lines and why you want to load 20 million records into memory at once –  Sep 07 '20 at 03:37
  • Yeah, its no IO operation so, it should go extra overhead on CPU while using parallel processing. But yes, for time being, there is a large file with millions of records to receive and process in c# and not in Job or any other process. – Cod29 Sep 07 '20 at 03:40
  • Your foreach is still sequential, I think you are confused about what you are doing. Have you actually measured this code with some profiler? – Tanveer Badar Sep 07 '20 at 03:45

2 Answers2

1

Your idea is basically correct, but there is a flaw in the implementation. Enumerating a ParallelQuery with the foreach statement is not causing the code inside the loop to run in parallel. At this stage the parallelization phase has already been completed. In your code there is actually no work parallelized, because there is no operator attached after the .AsParallel().WithDegreeOfParallelism(5). To do the looping in parallel you must replace the foreach with the ForAll operator, like this:

File.ReadLines(path)
    .AsParallel()
    .WithDegreeOfParallelism(5)
    .ForAll(line => { /* Process each line in parallel */ });

It is important to know what is parallelized here. The processing of each line is parallelized, while the loading of each line from the filesystem is not. The loading is serialized. The worker threads employed by the Parallel LINQ engine (one of whom is the current thread) are synchronized when accessing the source IEnumerable (the File.ReadLines(path) in this case).

Using a nested ConcurrentDictionary<String, ConcurrentBag<Data>> structure to store the processed lines is not very efficient. You can trust PLINQ to do a better job at grouping the data, than you could do manually with concurrent collections and such. By using the ToLookup operator you can get an ILookup<string, Data>, which is essentially a readonly dictionary with multiple values for each key.

var separators = new char[] { ',' };

var lookup = File.ReadLines(path)
    .AsParallel()
    .WithDegreeOfParallelism(5)
    .Select(line => line.Split(separators))
    .ToLookup(sInfo => sInfo[0], sInfo => new Data()
    {
        WBAN =  sInfo[0],
        Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
        time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
    });

This should be a better option regarding performance and memory efficiency, unless you specifically want the resulting structure to be mutable and thread-safe for some reason.

Two more notes:

  1. Hardcoding the degree of parallelism (5 in this case) is OK, provided that you know the hardware where your program will run. Otherwise it may cause friction by over-subscription (having more threads than the actual cores of the machine). Hint: virtual machines are often configured to be single threaded.

  2. The ConcurrentBag is a very specialized collection. In the majority of cases you'll get better performance with a ConcurrentQueue. Both classes offer a similar API. People probably prefer the ConcurrentBag because its Add method is more familiar than the Enqueue.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
0
  • Your program is IO-bound, not CPU-bound, so there is no advantage to parallelizing your processing.
    • It's IO-bound because your program can't process a line of data without having first read that line from the file, and generally speaking computers always read data from storage much more slowly than they can process it.
    • As your program is performing only trivial string operations on each line read, I can safely say with 99.9% confidence that the time it takes to add a Data element to a Dictionary<String,List<Data>> is a tiny tiny tiny fraction of the time it takes for your computer to read a single line from a text-file.
  • Also, avoid using File.ReadLines for programs like these because that will first read the entire file into memory.
    • If you look at my solution, you'll see it uses StreamReader to read each line one-by-one which means it doesn't need to wait until it reads everything into memory first.

So to parse that file with the best possible performance you don't need any Concurrent collections.

Just this:

private static readonly Char[] _sep = new Char[] { ',' }; // Declared here to ensure only a single array allocation.

public static async Task< Dictionary<String,List<Data>> > ReadFileAsync( FileInfo file )
{
    const Int32 ONE_MEGABYTE = 1 * 1024 * 1024; // Use 1MB+ sized buffers for async IO. Not smaller buffers like 1024 or 4096 as those are for synchronous IO.

    Dictionary<String,List<Data>> dict = new Dictionary<String,List<Data>>( capacity: 1024 );


    using( FileStream fs = new FileStream( path, FileAccess.Read, FileMode.Open, FileShare.Read, ONE_MEGABYTE, FileOptions.Asynchronous | FileOptions.SequentialScan ) )
    using( StreamReader rdr = new StreamReader( fs ) )
    {
        String line;
        while( ( line = await rdr.ReadLineAsync().ConfigureAwait(false) ) != null )
        {
            String[] values = line.Split( sep );
            if( values.Length < 3 ) continue;

            Data d = new Data()
            {
                WBAN = values[0],
                Date = values[1],
                time = values[2]
            };

            if( !dict.TryGetValue( d.WBAN, out List<Data> list ) )
            {
                dict[ d.WBAN ] = list = new List<Data>();
            }

            list.Add( d );
        }
    }
}

Update: Hypothetically speaking...

Hypothetically speaking, because file IO (especially asynchronous FileStream IO) uses large buffers (in this case, a ONE_MEGABYTE-sized buffer) the program could pass each buffer (as it's read, sequentially) into a parallel processor.

However the problem is that the data inside that buffer cannot be trivially apportioned-out to individual threads: in this case because the length of a line is not fixed, so a single thread still needs to read through the entire buffer to find out where the line-breaks are (and technically that could be parallelized somewhat, it would be adding huge amounts of complexity (as you also need to handle lines that cross buffer boundaries, or buffers that contain only a single line, etc).

And at this small scale the overhead of using the thread-pool and concurrent collection-types would erase the speed-up of parallel-procesing given the program would still largely be IO-bound.

Now, if you had a file sized in the gigabytes, with Data records that were sized around 1KB then I would go into detail demonstrating how you could do it because at that scale you probably would see a modest performance boost.

Dai
  • 141,631
  • 28
  • 261
  • 374
  • 1
    Make sense. And Thank you for the detail comments. I can see, the code in question was taken 1.09 min to process and the one you share above is `StopWatch.Elapsed` in 00:00:08 seconds, its super fast. – Cod29 Sep 07 '20 at 04:01
  • One thing, Is there a need for `ConfigureAwait(false) ` ? – Cod29 Sep 07 '20 at 04:01
  • 1
    @Cod29 Yes, when `ConfigureAwait(false)` is omitted (or when using `ConfigureAwait(true)`) the entire function will take longer to complete because the thread-pool will wait to resume the `while` loop's body on a single particular thread or context, which is unnecessary here. **Always specify `ConfigureAwait(false)` in non-UI code**: https://stackoverflow.com/questions/13489065/best-practice-to-call-configureawait-for-all-server-side-code – Dai Sep 07 '20 at 04:33
  • 2
    The [`File.ReadLines`](https://learn.microsoft.com/en-us/dotnet/api/system.io.file.readlines) is not loading the entire file into memory. Maybe you are confusing it with the `File.ReadAllLines`? Also I am **very** skeptical about using the `ReadLineAsync`. From [my experiments](https://stackoverflow.com/questions/59043037/read-text-file-with-iasyncenumerable/60443393#60443393) this method is not truly asynchronous, and just adds overhead (causes the allocation of a `Task` for each line) without offering any benefit over the synchronous `ReadLine`. – Theodor Zoulias Sep 07 '20 at 05:29
  • 1
    @TheodorZoulias Sorry, yes - I was thinking of `ReadAllLines` (I forgot that `ReadLines` returns a lazy `IEnumerable` entirely). – Dai Sep 07 '20 at 05:41
  • 1
    @TheodorZoulias Yes, I agree that `ReadLineAsync` is suboptimal - I find I get the best results if I use `FileStream.ReadAsync` directly and then wrap that buffer with a hacked synchronous `StreamReader` (and manually handle the cases where `\r` is at the end of one buffer and `\n` is at the start of the next buffer) - but showing that would be overkill for this question+answer. – Dai Sep 07 '20 at 05:46
  • 1
    @TheodorZoulias I feel the async IO functionality in .NET really needs rethinking especially because the design of `Stream` itself is broken (which is part of why we have the new `Pipe` classes). It's unfortunate that Microsoft chose to maintain backwards-compatibility in the main .NET Core assemblies instead of moving poorly-designed parts to a compatibility/shim library and keeping the main core assemblies clean. – Dai Sep 07 '20 at 05:48