Problem: Read-in data piles up, waiting to be written.
I have a basic ETL process that reads in a file, transforms the data, and then writes the data out to another file. Since I'm on a multi-core system, I'm trying to perform this using multiple threads. My problem is that the readers are outpacing the writers: many files end up read and their data transformed, but they pile up waiting to be written.
What I want is a balance between the files read and the files written, while still using multiple threads.
I've tried various things in the .NET library (C# 4.0). I think though that there is something I don't understand, and that this must be more complicated than simply using Thread
or ThreadPool.QueueUserWorkItem
or Task
the way they appear in the basic examples I've found.
For example, suppose I try something like this:
Task task = new Task(() => PerformEtl(sourceFile));
task.start();
If I log the files being read and the files being written, it's something like a 10-to-1 ratio. On a long-running process, this is unsustainable.
There must be some basic multi-threading/multi-processing pattern that I'm ignorant of or can't call to mind. Does anyone know where I should go from here? Thanks.
Solved:
Thanks to @Blam.
Here is some example/pseudo code to illustrate how a producer-consumer pattern can be implemented using the .NET library, as suggested by @Blam.
// Adapted from: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.100).aspx
BlockingCollection<object> dataItems = new BlockingCollection<object>(10);
List<Task> tasks = new List<Task>();
tasks.Add(
// Producer.
Task.Factory.StartNew(() =>
{
for (;;)
{
string filePath = GetNextFile();
if (filePath == null) break;
object data = ProcessData(ReadData(file));
dataItems.Add(data);
}
dataItems.CompleteAdding();
})
);
tasks.Add(
// Consumer.
Task.Factory.StartNew(() =>
{
while (!dataItems.IsCompleted))
{
object data;
try
{
data = dataItems.Take();
WriteData(data);
}
catch(InvalidOperationException ioe)
{
Console.Error.WriteLine(ioe.Message);
}
}
})
);
Task.WaitAll(tasks.ToArray());
The MSDN discussion is here: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.100).aspx