1

So my use case requires me to process a list of files, where for every file in the list I go through each line and do some calculations on those line. Now my problem is that I cannot have multiple files' lines in my buffer block, so I basically need to make sure that one file is completely processed (through a series of dataflow blocks), before I even enter the second file.

Now I looked at TPL DataFlow One by one processing where the answer says either to stop using tpl dataflow altogether or to encapsulate multiple processing blocks into one so I can control it. But if I do that I would lose the "composability" that tpl provides, it also seems a bit wasteful to lump in independent blocks together. Is there some other way to do this?

I thought of using the OutputAvailableAsync at the leaf node to notify me when everything has been flushed out before I post in another file. But I couldn't get OutputAvailableAsync to work at all. It just waits forever.

EDIT

Down the pipeline, I would have an actionblock with state, for which I am planning to use a ConcurrentDictionary (For each line in a file I have multiple things of note). Now I cannot possibly index each line because that would mean I would have to keep the state for N number of files being processed together. Here N would probably be the # of files to be processed.

This is what I have for now, bear in mind I have just coded out a proof of concept.

        static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }
  • Dataflow knows nothing about files, only the Action and Func<> delegates it calls. How files are read is up to the user's code. Post your code. `I cannot have multiple files' lines in my buffer block` you can, if you emit a `string[]` – Panagiotis Kanavos Nov 26 '20 at 12:01
  • I am just using a TransformManyBlock with File.ReadLines() and then down the pipeline I plan to process those lines. I haven't really written the processing code, yet. – LostRaider1297 Nov 26 '20 at 12:06
  • You *don't* need to ensure one file completes processing either. You can emit all lines as a `string[]` which means you have to finish reading everything first. Or instead of just `string` objects, you can send a more complex object, including eg the filename and a *completion indicator*. Eg, a `record (string FileName,string Line,bool Complete)`. When a downstream block receives a `Complete` record, it can finalize whatever it was doing and start over – Panagiotis Kanavos Nov 26 '20 at 12:07
  • 1
    Does that mean your ActionBlock has kind of a "state"? You could mitigate this by passing an identifier along with each line and pulling state from a Dictionary ... then it is completely irrelevant if lines are intermingled between two files. But you should really post your attempt, as far as you have it. – Fildor Nov 26 '20 at 12:07
  • The problem is the file can be huge, so I cannot afford to have all those lines in memory, which is why I need to process them one by one. Which is why I cannot afford to have other files' lines mixed together. – LostRaider1297 Nov 26 '20 at 12:08
  • 10 lines of code say more than a book of prose text.... show us what you got, I am sure we can help you out. – Fildor Nov 26 '20 at 12:10
  • With the default DOP=1 lines won't be intermingled. For higher DOPs, blocks preserve order *BUT* there's no way to know whether one of the 10 worker tasks is processing the `Complete` message, so no way to clean up. This *can* be handled though. – Panagiotis Kanavos Nov 26 '20 at 12:10
  • 1
    @LostRaider1297 for two days you've been posting details of a problem, not the problem itself. You've gotten bad and misleading answers as a result. What's the *actual problem*? Please don't say "I have 10 files with 1M rows". What do you want to do with them? Be *specific*. The type of computation you want affects the kind of pipeline to use, what parts need batching or not. Whether *branching* is needed or not. DataFlow even has JoinBlocks that can be used to synchronize streams – Panagiotis Kanavos Nov 26 '20 at 12:12
  • @LostRaider1297 you aren't the only one that had to process lots of data. I had to process some pretty big stock exchange files for algorithmic trading some years ago. Instead of feeding all files into the same pipeline, I created N separate pipelines, one per symbol, and started feeding order files to them. – Panagiotis Kanavos Nov 26 '20 at 12:15
  • Unfortunately, the speedup by simply using 20 workers on a 32 core machine instead of a single one was so great the suits didn't let me optimize the rest of the code ......... – Panagiotis Kanavos Nov 26 '20 at 12:16
  • @PanagiotisKanavos I understand. I am basically reading each line, serializing it to MyData object, then classifying it based on some rules, and then performing the "computation" based on this classification. Now I can't tell you what this computation is because I haven't gotten to that implementation yet (I know its going to require some state thoough which is why I made an edit responding to @Fildor), I am just trying to explore the TPL Dataflow library first before I can commit to it. – LostRaider1297 Nov 26 '20 at 12:21
  • Ok, then what's the scope of that state? Is it 1 state per file? Something else, that you could put an "id" on to identify it uniquely? Does it need a history or just a "current state" in order to be able to compute? ... Maybe you could "translate" lines into an update/upsert statement to a DB table ... ? – Fildor Nov 26 '20 at 12:28
  • There are like 6-7 state variables per file (which later down the line would need to be updated to the database anyway. I could translate lines into an update query, but that would be very wasteful (1 updated query per line). About having another "id", I could but my problem is not id'ing, (I could do like you said in your previous comment). Its just that my dictionary would grow by the number of files to be processed, which to me sounds a bit iffy, and I was just looking for a better way of doing it. – LostRaider1297 Nov 26 '20 at 12:37
  • You keep describing how you think the solution would look like, never what the actual problem is. Why would you need a ConcurrentDictonary in the first place? What does indexing have to do with anything? As for the code snippet, it wasn't helpful in the previous question either. You're forcing people to guess what you actually want to ask and provide an answer. – Panagiotis Kanavos Nov 26 '20 at 15:50
  • 1
    Well, in my replies to @Fildor, I did mention my reason for using a ConcurrentDictionary(which was for maintaining state for each file), and I think he did understand that. And like I said, I myself didn't have the implementation details figured. Through these questions, I was only looking for answers to general implementation problems. As for "indexing", it was actually a possible solution to make sure I know which file I was maintaining state for(that way I don't actually need sync file processing). I was looking for something better, but I think I am just going to go with that. – LostRaider1297 Nov 26 '20 at 16:02
  • @Fildor Thank you for your suggestion, having not found anything else, I think I am going to use your suggestion of indexing my "rows". And in regards to keeping the size of the dictionary in check, I ended up throttling the number of files I process by using a batch block. – LostRaider1297 Nov 26 '20 at 16:05

1 Answers1

1

You could take advantage of the conditional linking capabilities of the TPL Dataflow, in order to create a pipeline that is partially shared and partially dedicated. A single reader block and a single parser block would be shared by all files, while a dedicated processor block will be created for each file. Here is a simple demonstration of the concept:

var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
{
    return (line.Id, line.Line?.Split(","));
});

var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
{
    var processor = CreateProcessor(file.Id);

    // Create a conditional link from the parser block to the processor block
    var link = parser.LinkTo(processor, entry => entry.Id == file.Id);

    return File
        .ReadLines(file.Path)
        .Select(line => (file.Id, line))
        .Append((file.Id, null)); // Completion signal
});

ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
{
    var streamWriter = new StreamWriter($@"C:\{fileId}.out");

    return new ActionBlock<(int Id, string[] LineParts)>(line =>
    {
        if (line.LineParts == null)
        {
            streamWriter.Close(); // Completion signal received
            return;
        }
        streamWriter.WriteLine(String.Join("|", line.LineParts));
    });
}

reader.LinkTo(parser);

In this example each file is associated with an int Id. This Id is passed along with each line, in order to be able to reconstruct the file downstream. Value tuples are used to combine each piece of data with the Id of its originated file. A conditional link is created between the shared parser block and each dedicated processor block. A null payload is used as an end-of-file indicator. Upon receiving this signal a processor block should ideally unlink itself from the parser, in order to keep the overhead of the conditional linking mechanism to a minimum. The unlinking is performed by disposing the link returned by the LinkTo method. For simplicity's sake this important step has been omitted from the above example.

I should probably repeat here what I've already wrote in my answer in a previous related question, that passing individual strings from block to block will result to significant overhead. Chunkifying (batching) the workload is the way to go, in order to ensure that the pipeline will perform as smoothly (friction-free) as possible.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • This pretty much fits perfectly with me. Although I just have one question. Is there a way I can throttle the number of parsed rows so that I don't run out of memory. One way that I think of doing that would probably be by introducing a batch block and buffer block between the parser and the processor, that way I can then use SendAsync to manually cause back-pressure. Would that be right? – LostRaider1297 Nov 30 '20 at 11:43
  • @LostRaider1297 AFAIK to keep the memory usage under control it is enough to configure *all* blocks in the pipeline with a `BoundedCapacity`. No other trick is needed. This creates the problem that a failure in a block downstream may cause the blocks upstream to get stuck. (calling `SendAsync` to them will never complete) You can look [here](https://stackoverflow.com/questions/21603428/tpl-dataflow-exception-in-transform-block-with-bounded-capacity/62264767#62264767) for solutions to this problem. – Theodor Zoulias Nov 30 '20 at 17:24
  • @LostRaider1297 btw my suggestion about dedicated processor blocks per file makes it difficult to control the degree of parallelism of all of them combined. There is a solution, to configure them with a limited concurrency `TaskScheduler`. You can find [here](https://stackoverflow.com/questions/38400875/dataflow-tpl-implementing-pipeline-with-precondition/62356881#62356881) an example of this idea. – Theodor Zoulias Nov 30 '20 at 17:25
  • @LostRaider1297 another suggestion. Instead of using `ValueTuple`s as payloads, you could create your own `struct Keyed` type with a hardcoded `TKey` of your choosing. This way declaring the blocks will be less cumbersome, because you will only have to declare the type of the value, not the key. For example: `var parser = new TransformBlock, Keyed>(...` – Theodor Zoulias Nov 30 '20 at 17:38