1

I have below senario to perform the data flow with TPL. a and b are some datasource joined together, then pass through the data flow.

var a = new TransformBlock<object, int>(_ => 1);
var b = new TransformBlock<object, int>(_ => 2);

var join = new JoinBlock<int, int>(new GroupingDataflowBlockOptions { Greedy = false });

var transform = new TransformBlock<Tuple<int, int>, int>(uri =>
{
    Console.WriteLine("Handling '{0}'...", uri);
    return uri.Item1;
});

var printReversedWords = new ActionBlock<int>(ddd => Console.WriteLine(ddd));

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

a.LinkTo(join.Target1);
b.LinkTo(join.Target2);
join.LinkTo(transform);
transform.LinkTo(printReversedWords, linkOptions);

a.Post(1);
b.Post(1);
Task.WhenAll(a.Completion, b.Completion).ContinueWith(_ => join.Complete());
printReversedWords.Completion.Wait();

After all of this, I can see the log in the console like:

Handling '<1, 2>'...

1

That means the ActionBlock printReversedWords has been processed. However, it is still waiting at the last line, never ends.

Could anyone tell me what should I update on it?

Community
  • 1
  • 1
Vincent Shen
  • 83
  • 1
  • 1
  • 8

1 Answers1

1

To complete the pipeline you need to call Complete on the first block in the chain. In your case you'll need to make sure Complete() is called on both blocks a and b. And you'll need to propagate completion from the JoinBlock

Also, instead of a blocking call to .Wait() it's usually better to make the method async and await completion.

When you configure the the continuation make sure to handle exceptions as well as sucessfull completion by faulting the downstream blocks, An example is here.

//Propagate Completion    
join.LinkTo(transform, linkOptions); 
transform.LinkTo(printReversedWords, linkOptions);

a.Post(1);
b.Post(1);

//Complete the start of the pipeline
a.Complete();
b.Complete();

await Task.WhenAll(a.Completion, b.Completion).ContinueWith(_ => join.Complete());
await printReversedWords.Completion;
JSteward
  • 6,833
  • 2
  • 21
  • 30
  • Hi JSteward, I updated the code to add 'a.Complete(); b.Complete()'. But it still doesn't complete in the end. Actually I found another way to resolve this by adding a BroadcastBlock on the top (say 'startBlock'), then call 'startBlock.Complete()', and it works. – Vincent Shen Jul 23 '18 at 00:45
  • Glad you got it working! FYI - In the posted code, you would have had to propagate completion from the `JoinBlock` to the `TransformBlock` to flow completion all the way. – JSteward Jul 23 '18 at 15:26
  • Yeah, that makes sense. Propagate the completion through each block leads to the end. Thanks! – Vincent Shen Jul 25 '18 at 08:36