23

I'm building a console application that have to process a bunch of data.

Basically, the application grabs references from a DB. For each reference, parse the content of the file and make some changes. The files are HTML files, and the process is doing a heavy work with RegEx replacements (find references and transform them into links). The results in then stored on the file system and sent to an external system.

If I resume the process, in a sequential way :

var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
    var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
    var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
    SendToWs(ref, convertedHtml);
}

My program is working correctly but is quite slow. That's why I want to parallelise the process.

By now, I made a simple Parallelization adding AsParallel :

var refs = GetReferencesFromDB().AsParallel(); 
refs.ForAll(ref=>
{
    var filePath = GetFilePath(ref); 
    var html = File.ReadAllText(filePath); 
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); 
    SendToWs(ref, convertedHtml);
});

This simple change decrease the duration of the process (25% less time). However, what I understand with parallelization is that there won't be much benefits (or worse, less benefits) if parallelyzing over resources relying on I/O, because the i/o won't magically doubles.

That's why I think I should change my approach not to parallelize the whole process, but to create dependent chained queued tasks.

I.E., I should create a flow like :

Queue read file. When finished, Queue ParseHtml. When finished, Queue both send to WS and write locally. When finished, log the result.

However, I don't know how to realize such think.

I feel it will ends in a set of consumer/producer queues, but I didn't find a correct sample.

And moreover, I'm not sure if there will be benefits.

thanks for advices

[Edit] In fact, I'm the perfect candidate for using c# 4.5... if only it was rtm :)

[Edit 2] Another thing making me thinking it's not correctly parallelized, is that in the resource monitor, I see graphs of CPU, network I/O and disk I/O not stable. when one is high, others are low to medium

Steve B
  • 36,818
  • 21
  • 101
  • 174
  • 5
    There's a worrying phrase in your question: `The files are HTML files, and the process is doing a heavy work with RegEx replacements`. Those 2 words: `HTML` and `RegEx` are arch-enemies and never play together. If you try to put them together they will end up killing themselves mutually. – Darin Dimitrov Dec 14 '11 at 14:18
  • 1
    @DarinDimitrov: you are absolutely right. If have know the time of the parsing of a single file with the regex made by the former developer, you'll laugh. Now, it's still "long", but as it's < few seconds it's acceptable (versus "too long I have to kill the process" before). However, I don't see how I could avoid Regex in my case. – Steve B Dec 14 '11 at 14:26
  • 4
    you could avoid using RegEx by using an HTML parser. Such as HTML Agility Pack or SgmlReader. – Darin Dimitrov Dec 14 '11 at 14:27
  • @DarinDimitrov: even with these libs, I'll end up by testing text against a Regex of the form `XXX-YYY(Year)-ZZZ-lg.ext` (not all part are mandatory). I can imagine however, this will be more efficient to test against a single node than a whole line of html markup, isn't it ?. Thanks for the suggestion. – Steve B Dec 14 '11 at 14:41
  • @DarinDimitrov: you converted me to HtmlAgility pack `private static string ConvertLinks(string html) {var hDoc = new HtmlAgilityPack.HtmlDocument(); hDoc.LoadHtml(html); foreach (var node in hDoc.DocumentNode.DescendantNodes()) { if (node.NodeType == HtmlAgilityPack.HtmlNodeType.Text && node.ParentNode.Name != "a") { var converted = documentRefRegEx.Replace( node.InnerHtml, new MatchEvaluator( m => BuildLink ) ); node.InnerHtml = converted; } } return hDoc.DocumentNode.OuterHtml; }`. Thanks for that – Steve B Dec 15 '11 at 09:21

5 Answers5

17

You're not leveraging any async I/O APIs in any of your code. Everything you're doing is CPU bound and all your I/O operations are going to waste CPU resources blocking. AsParallel is for compute bound tasks, if you want to take advantage of async I/O you need to leverage the Asynchronous Programming Model (APM) based APIs today in <= v4.0. This is done by looking for BeginXXX/EndXXX methods on the I/O based classes you're using and leveraging those whenever available.

Read this post for starters: TPL TaskFactory.FromAsync vs Tasks with blocking methods

Next, you don't want to use AsParallel in this case anyway. AsParallel enables streaming which will result in an immediately scheduling a new Task per item, but you don't need/want that here. You'd be much better served by partitioning the work using Parallel::ForEach.

Let's see how you can use this knowledge to achieve max concurrency in your specific case:

var refs = GetReferencesFromDB();

// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
    refs,
    ref =>
{ 
    string filePath = GetFilePath(ref);

    byte[] fileDataBuffer = new byte[1048576];

    // Need to use FileStream API directly so we can enable async I/O
    FileStream sourceFileStream = new FileStream(
                                      filePath, 
                                      FileMode.Open,
                                      FileAccess.Read,
                                      FileShare.Read,
                                      8192,
                                      true);

    // Use FromAsync to read the data from the file
    Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
                                             sourceFileStream.BeginRead
                                             sourceFileStream.EndRead
                                             fileDataBuffer,
                                             fileDataBuffer.Length,
                                             null);

    // Add a continuation that will fire when the async read is completed
    readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
    {
        int soureFileStreamBytesRead;

        try
        {
            // Determine exactly how many bytes were read 
            // NOTE: this will propagate any potential exception that may have occurred in EndRead
            sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
        }
        finally
        {
            // Always clean up the source stream
            sourceFileStream.Close();
            sourceFileStream = null;
        }

        // This is here to make sure you don't end up trying to read files larger than this sample code can handle
        if(sourceFileStreamBytesRead == fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
        }

        // Convert the file data to a string
        string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);

        // Parse the HTML
        string convertedHtml = ParseHtml(html);

        // This is here to make sure you don't end up trying to write files larger than this sample code can handle
        if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
        }

        // Convert the file data back to bytes for writing
        Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);

        // Need to use FileStream API directly so we can enable async I/O
        FileStream destinationFileStream = new FileStream(
                                               destinationFilePath,
                                               FileMode.OpenOrCreate,
                                               FileAccess.Write,
                                               FileShare.None,
                                               8192,
                                               true);

        // Use FromAsync to read the data from the file
        Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
                                                  destinationFileStream.BeginWrite,
                                                  destinationFileStream.EndWrite,
                                                  fileDataBuffer,
                                                  0,
                                                  fileDataBuffer.Length,
                                                  null);

        // Add a continuation that will fire when the async write is completed
        destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
        {
            try
            {
                // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
                destinationFileStreamWriteAntecedent.Wait();
            }
            finally
            {
                // Always close the destination file stream
                destinationFileStream.Close();
                destinationFileStream = null;
            }
        },
        TaskContinuationOptions.AttachedToParent);

        // Send to external system **concurrent** to writing to destination file system above
        SendToWs(ref, convertedHtml);
    },
    TaskContinuationOptions.AttachedToParent);
});

Now, here's few notes:

  1. This is sample code so I'm using a 1MB buffer to read/write files. This is excessive for HTML files and wasteful of system resources. You can either lower it to suit your max needs or implement chained reads/writes into a StringBuilder which is an excercise I leave up to you since I'd be writing ~500 more lines of code to do async chained reads/writes. :P
  2. You'll note that on the continuations for the read/write tasks I have TaskContinuationOptions.AttachedToParent. This is very important as it will prevent the worker thread that the Parallel::ForEach starts the work with from completing until all the underlying async calls have completed. If this was not here you would kick off work for all 5000 items concurrently which would pollute the TPL subsystem with thousands of scheduled Tasks and not scale properly at all.
  3. I call SendToWs concurrent to writing the file to the file share here. I don't know what is underlying the implementation of SendToWs, but it too sounds like a good candidate for making async. Right now it's assumed it's pure compute work and, as such, is going to burn a CPU thread while executing. I leave it as an excercise to you to figure out how best to leverage what I've shown you to improve throughput there.
  4. This is all typed free form and my brain was the only compiler here and SO's syntax higlighting is all I used to make sure syntax was good. So, please forgive any syntax errors and let me know if I screwed up anything too badly that you can't make heads or tails of it and I'll follow up.
Community
  • 1
  • 1
Drew Marsh
  • 33,111
  • 3
  • 82
  • 100
  • when I read further your proposal, I'm wondering if it will applies in my case. In my "pipeline" of job, some steps are I/O bounds (reading file, writing file, sending file to WS). Your code elegantly pipes operations, but I think this won't limit concurrent job against the same I/O channel. I mean that your code will avoid blocking threads, but as multiple calls can occurs to the same I/O channel, I won't have benefit (or even loose a bit of speed). I actually think I have to create queues for each I/O channel, with one worker max per queue. Am I right? – Steve B Dec 16 '11 at 08:50
  • 2
    We're getting into a territory where there just wasn't enough info in the original question to be able to give absolute best approach you're looking for. The code provided will be limited to the extent that the Parallel::ForEach will only execute so many worker task concurrently (based on maxdegreeofparallelism & heuristics) and because the I/O workflow steps are chained to those, that will also limit the number of I/O operations going on as well while still freeing up CPU resources at those times (which the key). Even if u do p/c pattern you still want to use async I/O to maximize throughput. – Drew Marsh Dec 16 '11 at 17:12
  • 1
    Something else to consider, rather than manual producer/consumer is to look into [TPL DataFlow][1] which is a kind of a pet project library in .NET 4.0, but is baked in in .NET 4.5. It's really the best way to "pipeline" steps together while giving fine grained control over the concurrency of each particular step in the flow. [1] http://www.microsoft.com/download/en/details.aspx?id=14610 – Drew Marsh Dec 16 '11 at 17:15
5

The good news is your logic could be easily separated into steps that go into a producer-consumer pipeline.

  • Step 1: Read file
  • Step 2: Parse file
  • Step 3: Write file
  • Step 4: SendToWs

If you are using .NET 4.0 you can use the BlockingCollection data structure as the backbone for the each step's producer-consumer queue. The main thread will enqueue each work item into step 1's queue where it will be picked up and processed and then forwarded on to step 2's queue and so on and so forth.

If you are willing to move on to the Async CTP then you can take advantage of the new TPL Dataflow structures for this as well. There is the BufferBlock<T> data structure, among others, that behaves in a similar manner to BlockingCollection and integrates well with the new async and await keywords.

Because your algorithm is IO bound the producer-consumer strategies may not get you the performance boost you are looking for, but at least you will have a very elegant solution that would scale well if you could increase the IO throughput. I am afraid steps 1 and 3 will be the bottlenecks and the pipeline will not balance well, but it is worth experimenting with.

Brian Gideon
  • 47,849
  • 13
  • 107
  • 150
  • +1 for the BlockingCollection which looks like promising. I'll have to read a bit all of that, but it look like a good approach for my case – Steve B Dec 14 '11 at 14:30
3

Just a suggestion, but have you looked into the Consumer / Producer pattern ? A certain number of threads would read your files on disk and feed the content to a queue. Then another set of threads, known as the consumers, would "consume" the queue as its filled. http://zone.ni.com/devzone/cda/tut/p/id/3023

Hussein Khalil
  • 1,585
  • 2
  • 25
  • 47
  • 1
    Reading in parallel from the same disk actually makes the performance worse. In this case, only one producer is recommended. – Tudor Dec 14 '11 at 14:18
  • 1
    Is it slower in all cases ? I remember a test I made on a particular project of mine, and performance ended up increasing with a certain number of threads. Exceeding that particular number would, indeed, decrease performance though. So creating 10 threads would not be wise. Still, I take your opinion seriously and I'll re-profile when the time comes. – Hussein Khalil Dec 14 '11 at 14:32
  • 1
    Why I'm saying this is because the disk has to seek for the next position to read. So if multiple threads try to read, they will just bounce the disk between seeks and even degrade performance. – Tudor Dec 14 '11 at 14:35
2

Your best bet in these kind of scenario is definitely the producer-consumer model. One thread to pull the data and a bunch of workers to process it. There's no easy way around the I/O so you might as well just focus on optimizing the computation itself.

I will now try to sketch a model:

// producer thread
var refs = GetReferencesFromDB(); // ~5000 Datarow returned

foreach(var ref in refs)
{
    lock(queue)
    {   
       queue.Enqueue(ref);
       event.Set();
    }

    // if the queue is limited, test if the queue is full and wait.
}

// consumer threads
while(true)
{
    value = null;
    lock(queue)
    {
       if(queue.Count > 0)
       {
           value = queue.Dequeue();
       }
    }        

    if(value != null) 
       // process value
    else        
       event.WaitOne(); // event to signal that an item was placed in the queue.           
}

You can find more details about producer/consumer in part 4 of Threading in C#: http://www.albahari.com/threading/part4.aspx

Tudor
  • 61,523
  • 12
  • 102
  • 142
  • If you can use .net 4 or greater you can use [ConcurrentQueue](http://msdn.microsoft.com/en-us/library/dd267265.aspx) (or any other class that is a implmentation of `IProducerConsumerCollection`) to do this without taking locks. – Scott Chamberlain Dec 15 '11 at 20:16
  • @Scott Chamberlain: you're right. I haven't personally upgraded yet so I'm still old school. :)) – Tudor Dec 15 '11 at 20:17
0

I think your approach to split up the list of files and process each file in one batch is ok. My feeling is that you might get more performance gain if you play with degree of parallelism. See: var refs = GetReferencesFromDB().AsParallel().WithDegreeOfParallelism(16); this would start processing 16 files at the same time. Currently you are processing probably 2 or 4 files depending on number of cores you have. This is only efficient when you have only computation without IO. For IO intensive tasks adjustment might bring incredible performance improvements reducing processor idle time.

If you are going to split up and join tasks back using producer-consumer look at this sample: Using Parallel Linq Extensions to union two sequences, how can one yield the fastest results first?

Community
  • 1
  • 1
George Mamaladze
  • 7,593
  • 2
  • 36
  • 52
  • my bottlenecks are both the disk i/o and the network i/o... I don't think it will help adding more workers in my case – Steve B Dec 14 '11 at 14:43
  • If all three have exactly the same throughput, parallelism will not bring significant performance gain. Parallelism is good to fill out idle times of some resource with another task running concurrently. Breaking down the processing chain in resource specific chunks will not change the picture, thus you have limited buffer (RAM) and can not read 1000 files until 1 gets parsed and vice versa. – George Mamaladze Dec 14 '11 at 14:57