So my requirement is to read multiple CSV files (each having a minimum of a million rows) and then parse each line. Currently, the way I have broken up my pipeline, I am first creating a separate pipeline to just read a CSV file into a string[] and then I plan to create the parsing pipeline later.
But seeing the results of my File Reading Pipeline, I am dumbfounded because it is considerably slower than just looping through the CSV file and then looping through the rows.
static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
{
var lineBufferBlock = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = batchSize});
var fileReadingBlock = new ActionBlock<string>(async (filePath) =>
{
using (var fileStream = File.OpenRead(filePath)) {
using (var streamReader = new StreamReader(fileStream, Encoding.UTF8, true, batchSize)) {
string line;
while ((line = streamReader.ReadLine()) != null) {
var isCompleted = await lineBufferBlock.SendAsync(line);
while (!isCompleted)
{
isCompleted = await lineBufferBlock.SendAsync(line);
}
}
}
}
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
lineBufferBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});
fileReadingBlock.Completion.ContinueWith((task) =>
{
lineBufferBlock.Complete();
});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
}
And then I finally consume it as follows
for (int i = 1; i < 5; i++) {
var filePath = $"C:\\Users\\File{i}.csv";
fileReadingPipeline.SendAsync(filePath);
}
fileReadingPipeline.Complete();
while (true) {
try {
var outputRows = fileReadingPipeline.Receive();
foreach (string word in outputRows)
{
}
}
catch (InvalidOperationException e) {
break;
}
}
Whereas my straight loop code is the following:
for (int i = 1; i < 5; i++) {
var filePath = $"C:\\Users\\File{i}.csv";
foreach (string row in File.ReadLines(filePath))
{
foreach (string word in row.Split(","))
{
}
}
}
The difference in performance comes down to ~15 seconds for TPL Dataflow whereas it is ~5s for the looping code.
EDIT
On better advice from the comments, I have removed the unnecessary lineBufferBlock from the pipeline and this is my code now. However performance still remains the same.
var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
{
return File.ReadLines(filePath);
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
fileReadingBlock.LinkTo(fileParsingBlock, new DataflowLinkOptions { PropagateCompletion = true});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);