3

I've got a problem where I have to process a large batch of large jsonl files (read, deserialize, do some transforms db lookups etc, then write the transformed results in a .net core console app.

I've gotten better throughput by putting the output in batches on a separate thread and was trying to improve the processing side by adding some parallelism but the overhead ended up being self defeating.

I had been doing:

using (var stream = new FileStream(_filePath, FileMode.Open))
using (var reader = new StreamReader(stream)
{
    for (;;)
    {
        var l = reader.ReadLine();
        if (l == null)
            break;
        // Deserialize
        // Do some database lookups
        // Do some transforms
        // Pass result to output thread
    }
}

And some diagnostic timings showed me that the ReadLine() call was taking more than the deserialization, etc. To put some numbers on that, a large file would have about:

  • 11 seconds spent on ReadLine
  • 7.8 seconds spend on serialization
  • 10 seconds spent on db lookups

I wanted to overlap that 11 seconds of file i/o with the other work so I tried

using (var stream = new FileStream(_filePath, FileMode.Open))
using (var reader = new StreamReader(stream)
{
    var nextLine = reader.ReadLineAsync();
    for (;;)
    {
        var l = nextLine.Result;
        if (l == null)
            break;
        nextLine = reader.ReadLineAsync();
        // Deserialize
        // Do some database lookups
        // Do some transforms
        // Pass result to output thread
    }
}

To get the next I/O going while I did the transform stuff. Only that ended up taking a lot longer than the regular sync stuff (like twice as long).

I've got requirements that they want predictability on the overall result (i.e. the same set of files have to be processed in name order and the output rows have to be predictably in the same order) so I can't just throw a file per thread and let them fight it out.

I was just trying to introduce enough parallelism to smooth the throughput over a large set of inputs, and I was surprised how counterproductive the above turned out to be.

Am I missing something here?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
user1664043
  • 695
  • 5
  • 14
  • 2
    Don't have the time to write a full answer at the moment but you need to use two different threads/loops and a shared queue. Basically read line by line in one loop into a threadsafe queue, while another loop looks for and processes results. Video playback engines do this - read packets from disk in one thread, decode in another, and present in a third. It can be a bit daunting but I think that's the only way to get what you're looking for. Oh also `var l = nextLine.Result;` is blocking the thread, so you're really getting no benefits of parallelism by doing this. – Emperor Eto Jul 30 '21 at 21:52
  • 2
    To expand what @PeterMoore mentioned, you could start a task (`Task.Run`) for reading from the file, putting each line into a shared `ConcurrentQueue`, start another task and do a loop on the queue – Camilo Terevinto Jul 30 '21 at 22:09
  • Thanks guys... That's exactly the approach I used on the output handoff. – user1664043 Aug 01 '21 at 16:17

3 Answers3

1

The built-in asynchronous filesystem APIs are currently broken, and you are advised to avoid them. Not only they are much slower than their synchronous counterparts, but they are not even truly asynchronous. The .NET 6 will come with an improved FileStream implementation, so in a few months this may no longer be an issue.

What you are trying to achieve is called task-parallelism, where two or more heterogeneous operations are running concurrently and independently from each other. It's an advanced technique and it requires specialized tools. The most common type of parallelism is the so called data-parallelism, where the same type of operation is running in parallel on a list of homogeneous data, and it's commonly implemented using the Parallel class or the PLINQ library.

To achieve task-parallelism the most readily available tool is the TPL Dataflow library, which is built-in the .NET Core / .NET 5 platforms, and you only need to install a package if you are targeting the .NET Framework. This library allows you to create a pipeline consisting of linked components that are called "blocks" (TransformBlock, ActionBlock, BatchBlock etc), where each block acts as an independent processor with its own input and output queues. You feed the pipeline with data, and the data flows from block to block through the pipeline, while being processed along the way. You Complete the first block in the pipeline to signal that no more input data will ever be available, and then await the Completion of the last block to make your code wait until all the work has been done. Here is an example:

private async void Button1_Click(object sender, EventArgs e)
{
    Button1.Enabled = false;
    var fileBlock = new TransformManyBlock<string, IList<string>>(filePath =>
    {
        return File.ReadLines(filePath).Buffer(10);
    });

    var deserializeBlock = new TransformBlock<IList<string>, MyObject[]>(lines =>
    {
        return lines.Select(line => Deserialize(line)).ToArray();
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 2 // Let's assume that Deserialize is parallelizable
    });

    var persistBlock = new TransformBlock<MyObject[], MyObject[]>(async objects =>
    {
        foreach (MyObject obj in objects) await PersistToDbAsync(obj);
        return objects;
    });

    var displayBlock = new ActionBlock<MyObject[]>(objects =>
    {
        foreach (MyObject obj in objects) TextBox1.AppendText($"{obj}\r\n");
    }, new ExecutionDataflowBlockOptions()
    {
        TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
        // Make sure that the delegate will be invoked on the UI thread
    });

    fileBlock.LinkTo(deserializeBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
    deserializeBlock.LinkTo(persistBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
    persistBlock.LinkTo(displayBlock,
        new DataflowLinkOptions { PropagateCompletion = true });

    foreach (var filePath in Directory.GetFiles(@"C:\Data"))
        await fileBlock.SendAsync(filePath);

    fileBlock.Complete();
    await displayBlock.Completion;
    MessageBox.Show("Done");
    Button1.Enabled = true;
}

The data passed through the pipeline should be chunky. If each unit of work is too lightweight, you should batch them in arrays or lists, otherwise the overhead of moving lots of tiny data around is going to outweigh the benefits of parallelism. That's the reason for using the Buffer LINQ operator (from the System.Interactive package) in the above example. The .NET 6 will come with a new Chunk LINQ operator, offering the same functionality.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
1

Theodor's suggestion looks like a really powerful and useful library that's worth checking out, but if you're looking for a smaller DIY solution this is how I would approach it:

using System;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Generic;

namespace Parallelism
{
    class Program
    {
        private static Queue<string> _queue = new Queue<string>();
        private static Task _lastProcessTask;
        
        static async Task Main(string[] args)
        {
            string path = "???";
            await ReadAndProcessAsync(path);
        }

        private static async Task ReadAndProcessAsync(string path)
        {
            using (var str = File.OpenRead(path))
            using (var sr = new StreamReader(str))
            {
                string line = null;
                while (true)
                {
                    line = await sr.ReadLineAsync();
                    if (line == null)
                        break;

                    lock (_queue)
                    {
                        _queue.Enqueue(line);
                        if (_queue.Count == 1)
                            // There was nothing in the queue before
                            // so initiate a new processing loop. Save 
                            // but DON'T await the Task yet.
                            _lastProcessTask = ProcessQueueAsync();
                    }
                }                
            }

            // Now that file reading is completed, await 
            // _lastProcessTask to ensure we don't return
            // before it's finished.
            await _lastProcessTask;
        }

        // This will continue processing as long as lines are in the queue,
        // including new lines entering the queue while processing earlier ones.
        private static Task ProcessQueueAsync()
        {
            return Task.Run(async () =>
            {
                while (true)
                {
                    string line;
                    lock (_queue)
                    {              
                        // Only peak at first so the read loop doesn't think
                        // the queue is empty and initiate a second processing
                        // loop while we're processing this line.
                        if (!_queue.TryPeek(out line))
                            return;
                    }
                    await ProcessLineAsync(line);
                    lock (_queue)
                    {
                        // Dequeues the item we just processed. If it's the last
                        // one, this loop is done.
                        _queue.Dequeue();
                        if (_queue.Count == 0)
                            return;
                    }
                }
            });
        }

        private static async Task ProcessLineAsync(string line)
        {
            // do something
        }
    }
}

Note this approach has a processing loop that terminates when nothing is left in the queue, and is re-initiated if needed when new items are ready. Another approach would be to have a continuous processing loop that repeatedly re-checks and does a Task.Delay() for a small amount of time while the queue is empty. I like my approach better because it doesn't bog down the worker thread with periodic and unnecessary checks but performance would likely be unnoticeably different.

Also just to comment on Blindy's answer, I have to disagree with discouraging the use of parallelism here. First off, most CPUs these days are multi-core, so smart use of the .NET threadpool will in fact maximize your application's efficiency when run on multi-core CPUs and have pretty minimal downside in single-core scenarios.

More importantly, though, async does not equal multithreading. Asynchronous programming existed long before multithreading, I/O being the most notable example. I/O operations are in large part handled by hardware other than the CPU - the NIC, SATA controllers, etc. They use an ancient concept called the Hardware Interrupt that most coders today have probably never heard of and predates multithreading by decades. It's basically just a way to give the CPU a callback to execute when an off-CPU operation is finished. So when you use a well-behaved asynchronous API (notwithstanding that .NET FileStream has issues as Theodore mentioned), your CPU really shouldn't be doing that much work at all. And when you await such an API, the CPU is basically sitting idle until the other hardware in the machine has written the requested data to RAM.

I agree with Blindy that it would be better if computer science programs did a better job of teaching people how computer hardware actually works. Looking to take advantage of the fact that the CPU can be doing other things while waiting for data to be read off the disk, off a network, etc., is, in the words of Captain Kirk, "officer thinking".

Emperor Eto
  • 2,456
  • 2
  • 18
  • 32
  • Oh boy. That's the kind of code that I would expect to see inside a library, not in application code. It is fragile, difficult to test, and difficult to adjust when new requirements arrive. What would you do if the processed files were multiple GB in size, and pushing every single line in the `Queue` resulted in out-of-memory exceptions? Or if processing the lines with a single worker-task (`ProcessQueueAsync`) was too slow, and you wanted to add one or two more workers to speed things up? – Theodor Zoulias Aug 01 '21 at 02:44
  • 1
    @theodorzoulias hah I'll take that as a compliment I think. ;) But regarding queue size, if that's a concern you would check your queue size in the read loop and throttle yourself if it gets too big. As to the second hypothetical you're right, having more than one parallel worker - especially if you want to maintain output sequence order - would be beyond the scope of what would be comprehensible in an SO answer. Hence why I said your suggestion was worth checking out. – Emperor Eto Aug 01 '21 at 07:09
  • 1
    My first professional gig after school was on VAX/VMS systems in the 1980s. Exactly as you say, they had asynch but only on off-cpu io operations. – user1664043 Aug 01 '21 at 16:26
  • @user1664043 that explains a lot :) Hope my answer helps you squeeze some more efficiency out of this! – Emperor Eto Aug 01 '21 at 19:22
  • "Hardware Interrupt that most coders today have probably never heard of" -- I sure hope that's not true. Programmers who use async in this day and age need to know that fundamental concept or else they shouldn't be using async. – rory.ap May 29 '22 at 13:27
-2

11 seconds spent on ReadLine

More like, specifically, 11 seconds spent on file I/O, but you didn't measure that.

Replace your stream creation with this instead:

using var reader = new StreamReader(_filePath, Encoding.UTF8, false, 50 * 1024 * 1024);

That will cause it to read it to a buffer of 50MB (play with the size as needed) to avoid repeated I/O on what seems like an ancient hard drive.

I was just trying to introduce enough parallelism to smooth the throughput

Not only did you not introduce any parallelism at all, but you used ReadLineAsync wrong -- it returns a Task<string>, not a string.

It's completely overkill, the buffer size increase will most likely fix your issue, but if you want to actually do this you need two threads that communicate over a shared data structure, as Peter said.

Only that ended up taking a lot longer than the regular sync stuff

It baffles me that people think multi-threaded code should take less processing power than single-threaded code. There has to be some really basic understanding missing from present day education to lead to this. Multi-threading includes multiple extra context switches, mutex contention, your OS scheduler kicking in to replace one of your threads (leading to starvation or oversaturation), gathering, serializing and aggregating results after work is done etc. None of that is free or easy to implement.

Blindy
  • 65,249
  • 10
  • 91
  • 131