4

Problem: Read-in data piles up, waiting to be written.

I have a basic ETL process that reads in a file, transforms the data, and then writes the data out to another file. Since I'm on a multi-core system, I'm trying to perform this using multiple threads. My problem is that the readers are outpacing the writers: many files end up read and their data transformed, but they pile up waiting to be written.

What I want is a balance between the files read and the files written, while still using multiple threads.

I've tried various things in the .NET library (C# 4.0). I think though that there is something I don't understand, and that this must be more complicated than simply using Thread or ThreadPool.QueueUserWorkItem or Task the way they appear in the basic examples I've found.

For example, suppose I try something like this:

Task task = new Task(() => PerformEtl(sourceFile));
task.start();

If I log the files being read and the files being written, it's something like a 10-to-1 ratio. On a long-running process, this is unsustainable.

There must be some basic multi-threading/multi-processing pattern that I'm ignorant of or can't call to mind. Does anyone know where I should go from here? Thanks.


Solved:

Thanks to @Blam.

Here is some example/pseudo code to illustrate how a producer-consumer pattern can be implemented using the .NET library, as suggested by @Blam.

// Adapted from: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.100).aspx
BlockingCollection<object> dataItems = new BlockingCollection<object>(10);
List<Task> tasks = new List<Task>();

tasks.Add(
    // Producer.
    Task.Factory.StartNew(() =>
    {
        for (;;)
        {
            string filePath = GetNextFile();
            if (filePath == null) break;

            object data = ProcessData(ReadData(file));
            dataItems.Add(data);
        }

        dataItems.CompleteAdding();
    })
);

tasks.Add(
    // Consumer.
    Task.Factory.StartNew(() =>
    {
        while (!dataItems.IsCompleted))
        {
            object data;

            try
            {
                data = dataItems.Take();
                WriteData(data);
            }
            catch(InvalidOperationException ioe)
            {
                Console.Error.WriteLine(ioe.Message);
            }
        }
    })
);

Task.WaitAll(tasks.ToArray());

The MSDN discussion is here: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.100).aspx

Mario
  • 2,397
  • 2
  • 24
  • 41
  • 1
    You must throttle the writer. That can be as simple as Semaphore, call WaitOne() in the writer, Release() in the readers. – Hans Passant Jul 09 '15 at 13:45
  • Why not run N threads, each of which does `while (1) { Read(); Transform(); Write(); }`. No throttling required. – usr Jul 09 '15 at 13:45
  • 1
    "Throttle the writer"? Shouldn't I be throttling the reader? I want the readers to slow down. – Mario Jul 09 '15 at 13:46
  • @usr Yes, I essentially want to hand out a certain number of threads, and queue the tasks running on the threads. But, how does that work in .NET? (I'm new to .NET.) Can you point me in the right direction? I would think the framework should determine the optimal number of threads. – Mario Jul 09 '15 at 13:47
  • Why queue anything? Just start N thread (you already know how). Make each thread read the required data. No queue. – usr Jul 09 '15 at 13:48
  • What's to stop a thread from being continually elbowed out of the way by other threads, and never getting its assigned job done? – Mario Jul 09 '15 at 13:50
  • You can check out the RX library for throttle. [There is a question about that](http://stackoverflow.com/questions/3211134/how-to-throttle-event-stream-using-rx). – ntohl Jul 09 '15 at 13:50
  • Threads are not assigned jobs. They pull jobs from a thread-safe queue. They pull jobs, not file contents. That's small data. Even if some threads were constantly suspended, eventually all will completed. – usr Jul 09 '15 at 13:51
  • @usr I noted that this is a long-running job. If we're talking a million files, then "eventually" might overwhelm the system's resources, no? – Mario Jul 09 '15 at 13:53
  • Are you opening/writing/closing the file for each write? – Ron Beyer Jul 09 '15 at 13:53
  • N would be about the number of CPU cores that you have. That does not overwhelm anything. If there are millions of files you could simply ue PLINQ: `Directory.EnumerateFiles(...).AsParallel().ForAll(ETL)`. Done. – usr Jul 09 '15 at 13:54
  • @RonBeyer The short answer is yes. Actually, a .NET class is doing the writing. I am building an EML file from an HTML file. The EML file gets written to disk by using the SmtpClient class's Send method, after setting up a "pickup directory." I have to assume the library is opening-writing-closing each file, properly. – Mario Jul 09 '15 at 13:56
  • The other question is, is it a new file for each write operation, or an open and append or open and replace operation? – Ron Beyer Jul 09 '15 at 13:57
  • @RonBeyer Each write is a new file. – Mario Jul 09 '15 at 13:58
  • Does each thread have its own SmtpClient? – Jerry Federspiel Jul 09 '15 at 14:01
  • @JerryFederspiel Each thread has its own SmtpClient. – Mario Jul 09 '15 at 14:02
  • Are you trapping/capturing any exceptions that may be occurring in the writers? – Jerry Federspiel Jul 09 '15 at 14:06
  • @JerryFederspiel Yes, there can be exceptions, and I need to trap them, log them, and then carry on. – Mario Jul 09 '15 at 14:07
  • 1
    I think you have the consumer and producer comments backwards – paparazzo Jul 09 '15 at 16:28
  • Ha! Thanks. I changed it :-) – Mario Jul 09 '15 at 16:37

1 Answers1

4

I do exactly that I and break it into 3

  • Read
    There is just one set of heads - doing this in parallel does no good
    Close the file and pass text to next step
  • Process
  • Write

Use BlockingCollection with Upperbound (bounded capacity)
With Upperbound the fast steps do not get too far ahead of the slow

So you have multiple cores. You are probably IO bound.

You can process (step 2) in parallel but unless you have some complex transforms it will not make a difference.

Try to read and write on different physical devices.

Mario
  • 2,397
  • 2
  • 24
  • 41
paparazzo
  • 44,497
  • 23
  • 105
  • 176
  • This sounds promising. I should have thought of the term "producer-consumer," but it didn't come to mind. I'll look into your suggestion. – Mario Jul 09 '15 at 14:19
  • This worked! Thanks. I've edited my answer to include some pseudo-code for the solution. – Mario Jul 09 '15 at 16:18