I am working on a project where a need arose that is a perfect scenario for TPL Dataflow. Having relatively limited experience with it (and what I do have was done some time ago), I have been brushing up on it by reading Microsoft's documentation as well as articles I can find online.
Having done that, I built my code to chain together a series of blocks (mostly TransformBlock
and ending with an ActionBlock
doing something like this:
var block1 = new TransformBlock<T, U>(async input => {});
var block2 = new TransformBlock<U, V>(async input => {});
var block3 = new ActionBlock<V>(async input => {});
block1.LinkTo(block2);
block2.LinkTo(block3);
foreach(var item in items)
{
await block1.SendAsync(item);
}
block1.Complete();
await block3.Completion;
Someone in an article (which I can't find) suggested that there should be continuation tasks in the pipeline to mark blocks as complete. This is the code they provided for that.
// Create the continuation tasks in the pipeline that marks each block as complete.
await block1.Completion.ContinueWith(t =>
{
if (t.IsFaulted) { ((IDataflowBlock)block2).Fault(t.Exception); }
else { block2.Complete(); }
});
await block2.Completion.ContinueWith(t =>
{
if (t.IsFaulted) { ((IDataflowBlock)block3).Fault(t.Exception); }
else { block3.Complete(); }
});
I will admit that I don't fully understand what this code is doing and whether it is even needed. When I attempt to run this in the code I just wrote, the code hangs on the first ContinueWith
and it never makes it to running the pipeline.
I would appreciate additional explanation since I want to get a better understanding of the nuances of what is going on here.