0

So my requirement is to read multiple CSV files (each having a minimum of a million rows) and then parse each line. Currently, the way I have broken up my pipeline, I am first creating a separate pipeline to just read a CSV file into a string[] and then I plan to create the parsing pipeline later.

But seeing the results of my File Reading Pipeline, I am dumbfounded because it is considerably slower than just looping through the CSV file and then looping through the rows.

static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var lineBufferBlock = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = batchSize});

        var fileReadingBlock = new ActionBlock<string>(async (filePath) =>
        {
            using (var fileStream = File.OpenRead(filePath)) {
                using (var streamReader = new StreamReader(fileStream, Encoding.UTF8, true, batchSize)) {
                    string line;
                    while ((line = streamReader.ReadLine()) != null) {
                        var isCompleted = await lineBufferBlock.SendAsync(line);
                        while (!isCompleted)
                        {
                            isCompleted = await lineBufferBlock.SendAsync(line);
                        }
                    }
                }
            }
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        lineBufferBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        fileReadingBlock.Completion.ContinueWith((task) =>
        {
            lineBufferBlock.Complete();
        });

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

And then I finally consume it as follows

        for (int i = 1; i < 5; i++) {
            var filePath = $"C:\\Users\\File{i}.csv";
            fileReadingPipeline.SendAsync(filePath);
        }
        fileReadingPipeline.Complete();
        while (true) {
            try {
                var outputRows = fileReadingPipeline.Receive();
                foreach (string word in outputRows)
                {

                }
            }
            catch (InvalidOperationException e) {
                break;
            }
        }

Whereas my straight loop code is the following:

        for (int i = 1; i < 5; i++) {

            var filePath = $"C:\\Users\\File{i}.csv";
            foreach (string row in File.ReadLines(filePath))
            {
                foreach (string word in row.Split(","))
                {

                }

            }

        }

The difference in performance comes down to ~15 seconds for TPL Dataflow whereas it is ~5s for the looping code.

EDIT

On better advice from the comments, I have removed the unnecessary lineBufferBlock from the pipeline and this is my code now. However performance still remains the same.

            var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        fileReadingBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
  • Comments are not for extended discussion; this conversation has been [moved to chat](https://chat.stackoverflow.com/rooms/225133/discussion-on-question-by-lostraider1297-why-is-my-tpl-dataflow-pipeline-slower). – Samuel Liew Nov 25 '20 at 22:46

1 Answers1

0

When you configure a pipeline, you should have in mind the capabilities of the hardware that is going to do the job. The TPL Dataflow is not doing the job by itself, it's delegating it to the CPU, the HDD/SSD, the network card etc. For example when reading files from a hard disk, it is probably futile to instruct the TPL to read data from 8 files concurrently, because the head of the mechanical arm of the HDD can not be physically located in 8 places at the same time. This boils down to the fact that reading files from filesystems is not particularly parallel-friendly. It is slightly better in case of SSDs, but you'll have to test it in a case by case basis.

Another issue with parallelization is granularity. You want the workload to be chunky, not granular. Otherwise the cost of passing messages from buffer to buffer, and putting memory barriers around each transfer to ensure cross-thread visibility, may negate any benefits you may expect from employing parallelism. Tip: splitting a single string to parts is a highly granular operation.

Here is a way to do it:

using static MoreLinq.Extensions.BatchExtension;

var reader = new TransformManyBlock<string, string[]>(filePath =>
{
    return File.ReadLines(filePath).Batch(100, r => r.ToArray());
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 1
});

var parser = new TransformBlock<string[], string[][]>(lines =>
{
    return lines.Select(line => line.Split(",")).ToArray();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
});

reader.LinkTo(parser, new DataflowLinkOptions { PropagateCompletion = true });

This example uses the Batch operator from the MoreLinq package in order to pass the lines around in batches of 100, instead of passing them one by one. You can find other batching options here.


Update: One more suggestion is to boost the minimum number of threads that the ThreadPool creates on demand (SetMinThreads). Otherwise the ThreadPool will be immediately saturated by the MaxDegreeOfParallelism = Environment.ProcessorCount configuration, which will cause small but noticeable (500 msec) delays, because of the intentional laziness of the ThreadPool's thread-injection algorithm.

ThreadPool.SetMinThreads(Environment.ProcessorCount * 2,
    Environment.ProcessorCount * 2);

It is enough to call this method once at the start of the program.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • While I definitely appreciate the information here, I tried this approach and it doesn't really improve the performance, so I am not going to accept it right now. Thanks for engaging in the conversation though! – LostRaider1297 Nov 25 '20 at 18:20
  • @LostRaider1297 *nothing* you do with naive parallelism is going to improve performance. You can't process data faster than you can read it, especially if you have to read rows one by one. Using a Dataflow helps by allowing different parts of processing to run on different threads. That's not the case here. – Panagiotis Kanavos Nov 25 '20 at 18:34
  • @LostRaider1297 I was not expecting it to be faster than the linear approach. I was expecting to improve over your original TPL approach, that was 3 times slower than the linear! The improvements will come when you give it some real work to. Currently the workload is nearly 100% I/O-bound (reading from the filesystem). If you want to improve this part, you probably need to upgrade your hardware storage. I doubt that a software solution exists that will give better performance, using the same hardware. – Theodor Zoulias Nov 25 '20 at 18:35
  • @LostRaider1297 if you want to improve performance you have to somehow partition the file itself and process each partition separately. Flat files are good for this, as you can can simply divide the file length by the number of partitions you want and look for the end of line after each partition limit. After that, you can read each partition as if it was a separate file. You *won't* get n-times improvement but at least you'll get *some* improvement – Panagiotis Kanavos Nov 25 '20 at 18:39
  • @PanagiotisKanavos I have experimented with parallel reading of a single file, and I observed a ~50% improvement when reading from my SSD, and a ~50% deterioration when reading from my external HDD. It was also quite tricky to do it right. The potential for introducing subtle bugs is enormous! I don't think it's worth the risk. – Theodor Zoulias Nov 25 '20 at 18:46
  • @TheodorZoulias, I implemented partitioning manually (instead of the MoreLINQ) library but it had the same performance as the TPL one. So I don't know if even using the MoreLINQ library will help much. – LostRaider1297 Nov 25 '20 at 18:54
  • @TheodorZoulias and I've showed just that in conferences 10 years ago. Have you though why? Or what's involved? The disk caches, the SATA controller caches, the OS caches, and finally .NET streams cache as well. The whole point is getting data to the *heavy computation thread*, not just reading data. That's why eg `xargs` or Poweshell's `foreach -Parallel` exist and can improve processing of disk files. If you're able to read big enough chunks in a few IO operations and process them, performance improves. If you read many small chunks, performance suffers due to the overhead – Panagiotis Kanavos Nov 25 '20 at 18:54
  • @LostRaider1297 what did you actually do? MoreLINQ has no effect at all in this case. It works on IEnumerable<> data, which means the data is *already* loaded. You've *already* paid the cost of IO. And since your code doesn't really do anything intensive, you gain nothing by parallelization – Panagiotis Kanavos Nov 25 '20 at 18:55
  • @LostRaider1297 you have to *partition the file before reading it* and read each part separately. That's a very old trick by the way - partition the big file and fire off N instances of a tool to work on it. – Panagiotis Kanavos Nov 25 '20 at 18:56
  • @LostRaider1297 I suggested using the MoreLinq just for convenience. Of course you can implement your own batching mechanism if you want, or just grab the [source code](https://github.com/morelinq/MoreLINQ/blob/master/MoreLinq/Batch.cs) of the `Batch` method from MoreLinq's github repository (it's not too much code, and it has no inter-library dependencies AFAICS). – Theodor Zoulias Nov 25 '20 at 19:02
  • @PanagiotisKanavos partitioning and parallelizing the reading of a single file sounds too hardcore stuff to me. The OP has multiple files that need to be parsed, so I think it's simpler to just increase the `MaxDegreeOfParallelism` of the reader block (assuming that the hardware can handle favorably the parallelization of course). – Theodor Zoulias Nov 25 '20 at 19:07
  • @PanagiotisKanavos I much appreciate the comments here, and the more I read them the more I realise that TPL really only excels at CPU Bound operations and using it on pure IO Bound operations causes a degradation of performance due to the overheads. So going forward my best approach I think, is to use async/await for the file reading stuff and then hook that up to the computation pipeline that I plan to build later. Would I be correct in doing this? – LostRaider1297 Nov 25 '20 at 19:08
  • @PanagiotisKanavos One more thing, by giving Stream reader a buffer size, am I not only firing one IO operation to get all those and then looping through that. That should have improved my performance right? – LostRaider1297 Nov 25 '20 at 19:10
  • @LostRaider1297 I updated my answer with one more suggestion. – Theodor Zoulias Nov 25 '20 at 19:21
  • @LostRaider1297 on the contrary, Dataflow is very good in async scenarios, as each block can itself execute async operations. It's perfect when you can break your work into interconnecting steps each of which can execute independently. That's why I use it for ETL jobs, especially when remote calls are involved. It's just not appropriate for your case, where you only have *one* operation. The single row split is too lightweight to count as an operation *and* produces enough garbage (the temporary strings) that over time the GC cost will become dominant. – Panagiotis Kanavos Nov 25 '20 at 21:32
  • 1
    @LostRaider1297 I've actually faced the GC cost too - I had to parse some badly formatted logs, where the original naive code resulted in so many temp strings that memory skyrocketed to multiple GBs. I initially tried to parallelize this *and* replaced the naive splits with regular expressions (I only needed *some* of the data from each line). Using a regex produced 8x better performance and 10x less memory usage, so I no longer needed to parallelize. I could, but it wasn't worth it. – Panagiotis Kanavos Nov 25 '20 at 21:35
  • 1
    @LostRaider1297 if you can actually break your work into steps you should *do the exact opposite of all the current suggestions* and *restrict* the number of threads per block. There's a very good reason only 1 thread is used by default. This way you can reduce the sync overhead. You can even specify that only 1 consumer is used in the `LinkTo` arguments. There's no point in creating 16, 32 threads for one block when only 1 is ever going to be active. Use more efficient parsing too. If your lines contain key=value pairs, you could use eg `(?.*?)=(?.*)` to capture them by name – Panagiotis Kanavos Nov 25 '20 at 21:39