So my use case requires me to process a list of files, where for every file in the list I go through each line and do some calculations on those line. Now my problem is that I cannot have multiple files' lines in my buffer block, so I basically need to make sure that one file is completely processed (through a series of dataflow blocks), before I even enter the second file.
Now I looked at TPL DataFlow One by one processing where the answer says either to stop using tpl dataflow altogether or to encapsulate multiple processing blocks into one so I can control it. But if I do that I would lose the "composability" that tpl provides, it also seems a bit wasteful to lump in independent blocks together. Is there some other way to do this?
I thought of using the OutputAvailableAsync at the leaf node to notify me when everything has been flushed out before I post in another file. But I couldn't get OutputAvailableAsync to work at all. It just waits forever.
EDIT
Down the pipeline, I would have an actionblock with state, for which I am planning to use a ConcurrentDictionary (For each line in a file I have multiple things of note). Now I cannot possibly index each line because that would mean I would have to keep the state for N number of files being processed together. Here N would probably be the # of files to be processed.
This is what I have for now, bear in mind I have just coded out a proof of concept.
static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
{
var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
{
return File.ReadLines(filePath);
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
{
return line.Split(",");
}, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});
return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);
}