17

There is a folder that contains 1000s of small text files. I aim to parse and process all of them while more files are being populated into the folder. My intention is to multithread this operation as the single threaded prototype took six minutes to process 1000 files.

I like to have reader and writer thread(s) as the following. While the reader thread(s) are reading the files, I'd like to have writer thread(s) to process them. Once the reader is started reading a file, I d like to mark it as being processed, such as by renaming it. Once it's read, rename it to completed.

How do I approach such a multithreaded application?

Is it better to use a distributed hash table or a queue?

Which data structure do I use that would avoid locks?

Is there a better approach to this scheme?

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
DarthVader
  • 52,984
  • 76
  • 209
  • 300
  • 1
    Which .net version is available to use? .Net 4 offers a great deal in assisting with this, but not sure if it's an option. – Nick Craver May 11 '10 at 02:13
  • 4
    One major limiting factor to this is I/O contention, no matter how you try to parallelize the work, everything still has to go through the same I/O. – Chris O May 11 '10 at 02:13
  • ok, that s fine, i d like to fully utilize the IO – DarthVader May 11 '10 at 02:16
  • .net 3.5. I doubt that .net 4 is an option for me. – DarthVader May 11 '10 at 02:16
  • 8
    @Nick Craver I would really dig seeing a .Net 4 suggestion in addition to the 3.5 one. But only if it wouldn't put you out at all, and only if other people are interested as well (they could mod this comment up). – Christopher May 11 '10 at 02:21
  • @Chris - I left an answer below showing the approach, the benefits, some concerns to be aware of and an example test you can play with, hope you find it useful :) – Nick Craver May 11 '10 at 12:42

6 Answers6

27

Since there's curiosity on how .NET 4 works with this in comments, here's that approach. Sorry, it's likely not an option for the OP. Disclaimer: This is not a highly scientific analysis, just showing that there's a clear performance benefit. Based on hardware, your mileage may vary widely.

Here's a quick test (if you see a big mistake in this simple test, it's just an example. Please comment, and we can fix it to be more useful/accurate). For this, I just dropped 12,000 ~60 KB files into a directory as a sample (fire up LINQPad; you can play with it yourself, for free! - be sure to get LINQPad 4 though):

var files = 
Directory.GetFiles("C:\\temp", "*.*", SearchOption.AllDirectories).ToList();

var sw = Stopwatch.StartNew(); //start timer
files.ForEach(f => File.ReadAllBytes(f).GetHashCode()); //do work - serial
sw.Stop(); //stop
sw.ElapsedMilliseconds.Dump("Run MS - Serial"); //display the duration

sw.Restart();
files.AsParallel().ForAll(f => File.ReadAllBytes(f).GetHashCode()); //parallel
sw.Stop();
sw.ElapsedMilliseconds.Dump("Run MS - Parallel");

Slightly changing your loop to parallelize the query is all that's needed in most simple situations. By "simple" I mostly mean that the result of one action doesn't affect the next. Something to keep in mind most often is that some collections, for example our handy List<T> is not thread safe, so using it in a parallel scenario isn't a good idea :) Luckily there were concurrent collections added in .NET 4 that are thread safe. Also keep in mind if you're using a locking collection, this may be a bottleneck as well, depending on the situation.

This uses the .AsParallel<T>(IEnumeable<T>) and .ForAll<T>(ParallelQuery<T>) extensions available in .NET 4.0. The .AsParallel() call wraps the IEnumerable<T> in a ParallelEnumerableWrapper<T> (internal class) which implements ParallelQuery<T>. This now allows you to use the parallel extension methods, in this case we're using .ForAll().

.ForAll() internally crates a ForAllOperator<T>(query, action) and runs it synchronously. This handles the threading and merging of the threads after it's running... There's quite a bit going on in there, I'd suggest starting here if you want to learn more, including additional options.


The results (Computer 1 - Physical Hard Disk):

  • Serial: 1288 - 1333ms
  • Parallel: 461 - 503ms

Computer specs - for comparison:

The results (Computer 2 - Solid State Drive):

  • Serial: 545 - 601 ms
  • Parallel: 248 - 278 ms

Computer specifications - for comparison:

  • Quad Core 2 Quad Q9100 @ 2.26 GHz
  • 8 GB RAM (DDR 1333)
  • 120 GB OCZ Vertex SSD (Standard Version - 1.4 Firmware)

I don't have links for the CPU/RAM this time, these came installed. This is a Dell M6400 Laptop (here's a link to the M6500... Dell's own links to the 6400 are broken).


These numbers are from 10 runs, taking the min/max of the inner 8 results (removing the original min/max for each as possible outliers). We hit an I/O bottleneck here, especially on the physical drive, but think about what the serial method does. It reads, processes, reads, processes, rinse repeat. With the parallel approach, you are (even with a I/O bottleneck) reading and processing simultaneously. In the worst bottleneck situation, you're processing one file while reading the next. That alone (on any current computer!) should result in some performance gain. You can see that we can get a bit more than one going at a time in the results above, giving us a healthy boost.

Another disclaimer: Quad core + .NET 4 parallel isn't going to give you four times the performance, it doesn't scale linearly... There are other considerations and bottlenecks in play.

I hope this was on interest in showing the approach and possible benefits. Feel free to criticize or improve... This answer exists solely for those curious as indicated in the comments :)

Community
  • 1
  • 1
Nick Craver
  • 623,446
  • 136
  • 1,297
  • 1,155
  • Great to see a concrete example of this. Thank you. – Christopher May 11 '10 at 15:07
  • Nick, wouldn't it be better if the OP uses a producer and several consumers, instead of parallel tasks for each file? If the OP makes too many parallel tasks, then switching between them will actually degrade performance... great post otherwise! – Kiril May 11 '10 at 16:05
  • 1
    @Lirik - Not sure I understand completely, you're not really context switching here, this creates the number of threads corresponding to the number of cores, so you're not context switching, except during an interruption like always. What would be the producer in your case (clarify a bit, example!)? Since the ability to scale the file reading depends on the data source, whether it's one physical HD, fibre channel, an SSD, RAM, etc...it's ability to read x number of files faster would depend on the medium...so not sure a single producer is faster...it might actually become a bottleneck :) – Nick Craver May 11 '10 at 16:13
  • @Nick, I'm trying to figure out how your example creates a number of threads corresponding to the cores? Is it automatic with TPL, or is there some other magic there? – Kiril May 11 '10 at 17:06
  • 1
    @Lirik - The number of workers is (by default) automatically scaled by PLINQ internally, though you can specify a limit (and lots of other options) if you want using MaxDegreeOfParallelism, Reed Copsey has a good explanation here: http://reedcopsey.com/2010/02/11/parallelism-in-net-part-9-configuration-in-plinq-and-tpl/ – Nick Craver May 11 '10 at 17:20
  • If I need to process several large files (~13 GB being the largest), will PLINQ still store everything in as it can fit or if I do ReadAllBytes is it going to try and throw 13GB into memory? Am I doomed to processing line by line for these monsters? – interesting-name-here Feb 20 '17 at 20:42
6

Design

The Producer/Consumer pattern will probably be the most useful for this situation. You should create enough threads to maximize the throughput.

Here are some questions about the Producer/Consumer pattern to give you an idea of how it works:

You should use a blocking queue and the producer should add files to the queue while the consumers process the files from the queue. The blocking queue requires no locking, so it's about the most efficient way to solve your problem.

If you're using .NET 4.0 there are several concurrent collections that you can use out of the box:

Threading

A single producer thread will probably be the most efficient way to load the files from disk and push them onto the queue; subsequently multiple consumers will be popping items off the queue and they'll process them. I would suggest that you try 2-4 consumer threads per core and take some performance measurements to determine which is most optimal (i.e. the number of threads that provide you with the maximum throughput). I would not recommend the use a ThreadPool for this specific example.

P.S. I don't understand what's the concern with a single point of failure and the use of distributed hash tables? I know DHTs sound like a really cool thing to use, but I would try the conventional methods first unless you have a specific problem in mind that you're trying to solve.

Community
  • 1
  • 1
Kiril
  • 39,672
  • 31
  • 167
  • 226
  • I recon the producer should also read the files into a buffer that is then passed onto the consumers. This way you won't have multiple threads trying to read from the HDD at the same time (which will cause the head to jump a lot and slow it down a LOT, rather than speed up the process). – Grant Peters May 11 '10 at 03:33
  • Correct Grant, one producer and multiple consumers. – Kiril May 11 '10 at 04:31
3

I recommend that you queue a thread for each file and keep track of the running threads in a dictionary, launching a new thread when a thread completes, up to a maximum limit. I prefer to create my own threads when they can be long-running, and use callbacks to signal when they're done or encountered an exception. In the sample below I use a dictionary to keep track of the running worker instances. This way I can call into an instance if I want to stop work early. Callbacks can also be used to update a UI with progress and throughput. You can also dynamically throttle the running thread limit for added points.

The example code is an abbreviated demonstrator, but it does run.

class Program
{
    static void Main(string[] args)
    {
        Supervisor super = new Supervisor();
        super.LaunchWaitingThreads();

        while (!super.Done) { Thread.Sleep(200); }
        Console.WriteLine("\nDone");
        Console.ReadKey();
    }
}

public delegate void StartCallbackDelegate(int idArg, Worker workerArg);
public delegate void DoneCallbackDelegate(int idArg);

public class Supervisor
{
    Queue<Thread> waitingThreads = new Queue<Thread>();
    Dictionary<int, Worker> runningThreads = new Dictionary<int, Worker>();
    int maxThreads = 20;
    object locker = new object();

    public bool Done { 
        get { 
            lock (locker) {
                return ((waitingThreads.Count == 0) && (runningThreads.Count == 0)); 
            } 
        } 
    }

    public Supervisor()
    {
        // queue up a thread for each file
        Directory.GetFiles("C:\\folder").ToList().ForEach(n => waitingThreads.Enqueue(CreateThread(n)));
    }

    Thread CreateThread(string fileNameArg)
    {
        Thread thread = new Thread(new Worker(fileNameArg, WorkerStart, WorkerDone).ProcessFile);
        thread.IsBackground = true;
        return thread;
    }

    // called when a worker starts
    public void WorkerStart(int threadIdArg, Worker workerArg)
    {
        lock (locker)
        {
            // update with worker instance
            runningThreads[threadIdArg] = workerArg;
        }
    }

    // called when a worker finishes
    public void WorkerDone(int threadIdArg)
    {
        lock (locker)
        {
            runningThreads.Remove(threadIdArg);
        }
        Console.WriteLine(string.Format("  Thread {0} done", threadIdArg.ToString()));
        LaunchWaitingThreads();
    }

    // launches workers until max is reached
    public void LaunchWaitingThreads()
    {
        lock (locker)
        {
            while ((runningThreads.Count < maxThreads) && (waitingThreads.Count > 0))
            {
                Thread thread = waitingThreads.Dequeue();
                runningThreads.Add(thread.ManagedThreadId, null); // place holder so count is accurate
                thread.Start();
            }
        }
    }
}

public class Worker
{
    string fileName;
    StartCallbackDelegate startCallback;
    DoneCallbackDelegate doneCallback;
    public Worker(string fileNameArg, StartCallbackDelegate startCallbackArg, DoneCallbackDelegate doneCallbackArg)
    {
        fileName = fileNameArg;
        startCallback = startCallbackArg;
        doneCallback = doneCallbackArg;
    }

    public void ProcessFile()
    {
        startCallback(Thread.CurrentThread.ManagedThreadId, this);
        Console.WriteLine(string.Format("Reading file {0} on thread {1}", fileName, Thread.CurrentThread.ManagedThreadId.ToString()));
        File.ReadAllBytes(fileName);
        doneCallback(Thread.CurrentThread.ManagedThreadId);
    }
}
hyde
  • 2,525
  • 4
  • 27
  • 49
Ed Power
  • 8,310
  • 3
  • 36
  • 42
1

You could have a central queue, the reader threads would need write access during the push of the in-memory contents to the queue. The processing threads would need read access to this central queue to pop off the next memory stream to-be-processed. This way you minimize the time spent in locks and don't have to deal with the complexities of lock free code.

EDIT: Ideally, you'd handle all exceptions/error conditions (if any) gracefully, so you don't have points of failure.

As an alternative, you can have multiple threads, each one "claims" a file by renaming it before processing, thus the filesystem becomes the implementation for locked access. No clue if this is any more performant than my original answer, only testing would tell.

Chris O
  • 5,017
  • 3
  • 35
  • 42
  • That introduces a single point of failure. I d like a decentralized approach. – DarthVader May 11 '10 at 02:33
  • 2
    Possibly, but having each thread be "intelligent" and work together with the other threads can be complicated. Cross-threading issues can be a nightmare to debug. A single (or central) queue is simpler. – Pretzel May 11 '10 at 03:25
  • What do you mean i dont have points of failure ? what happens if the server goes down ? centralized approach is always introduces single point of failure. – DarthVader May 11 '10 at 04:28
1

Generally speaking, 1000 small files (how small, btw?) should not take six minutes to process. As a quick test, do a find "foobar" * in the directory containing the files (the first argument in quotes doesn't matter; it can be anything) and see how long it takes to process every file. If it takes more than one second, I'll be disappointed.

Assuming this test confirms my suspicion, then the process is CPU-bound, and you'll get no improvement from separating the reading into its own thread. You should:

  1. Figure out why it takes more than 350 ms, on average, to process a small input, and hopefully improve the algorithm.
  2. If there's no way to speed up the algorithm and you have a multicore machine (almost everyone does, these days), use a thread pool to assign 1000 tasks each the job of reading one file.
Marcelo Cantos
  • 181,030
  • 38
  • 327
  • 365
  • @user177883: Firstly, a continuous process doesn't have an overall running time. Secondly, I am well aware that the six minutes is total processing time. The whole point of the test is to show how much of that time is I/O; I strongly suspect that almost none of it is, in which case there is nothing to be gained by parallelising I/O. – Marcelo Cantos May 11 '10 at 02:42
  • I guess i should have been more precise with what i meant by continuous process. I meant, this process will be running as long as the files are being created. – DarthVader May 11 '10 at 02:57
  • 1
    Did you run the test I suggested? I'm curious to know the result. – Marcelo Cantos May 11 '10 at 03:03
  • I will let you know once i do it. i dont have access to the files right now. – DarthVader May 11 '10 at 03:19
0

You might consider a queue of files to process. Populate the queue once by scanning the directory when you start and have the queue updated with a FileSystemWatcher to efficiently add new files to the queue without constantly re-scanning the directory.

If at all possible, read and write to different physical disks. That will give you maximum IO performance.

If you have an initial burst of many files to process and then an uneven pace of new files being added and this all happens on the same disk (read/write), you could consider buffering the processed files to memory until one of two conditions applies:

  • There are (temporarily) no new files
  • You have buffered so many files that you don't want to use more memory for buffering (ideally a configurable threshold)

If your actual processing of the files is CPU intensive, you could consider having one processing thread per CPU core. However, for "normal" processing CPU time will be trivial compared to IO time and the complexity would not be worth any minor gains.

Eric J.
  • 147,927
  • 63
  • 340
  • 553