2

I have a very basic, linear pipeline, in which I'd like to propagate completion and wait until everything completes:

static void Main(string[] args)
{
    ExecutePipeline().Wait();
}

static async Task ExecutePipeline()
{
    var addBlock = new TransformBlock<int, int>(x =>
    {
        var result = x + 2;
        Console.WriteLine(result);
        return result;
    });
    var subBlock = new TransformBlock<int, int>(x =>
    {
        var result = x - 2;
        Console.WriteLine(result);
        return result;
    });
    var mulBlock = new TransformBlock<int, int>(x =>
    {
        var result = x * 2;
        Console.WriteLine(result);
        return result;
    });
    var divBlock = new TransformBlock<int, int>(x =>
    {
        var result = x / 2;
        Console.WriteLine(result);
        return result;
    });

    var flowOptions = new DataflowLinkOptions { PropagateCompletion = true };
    addBlock.LinkTo(mulBlock, flowOptions);
    mulBlock.LinkTo(subBlock, flowOptions);
    subBlock.LinkTo(divBlock, flowOptions);

    addBlock.Post(4);
    addBlock.Complete();
    mulBlock.Complete();
    subBlock.Complete();
    await divBlock.Completion;
}

Unfortunately, in its current state, only the result of addBlock gets printed and the program terminates, instead of printing all of the results before termination.

If I comment out all of the lines which call Complete() on their blocks or if I leave addBlock.Complete() uncommented, I get a printout of all results in the pipeline, but the program never ends, since the completion is not propagated. However, if I unblock either mulBlock.Complete() or subBlock.Complete(), similarly to how the default code behaves, the program prints out the result of addBlock and terminates.

What's interesting is that uncommenting either of those two last mentioned blocks or all of them has the same behavior, which makes me question how the completion propagates if one of them is commented. Obviously, I'm missing something in the logic, but I just can't figure out what it is. How would I accomplish the desired behavior of printing all of the results?

EDIT:

So, I finally found something that worked for me at https://stackoverflow.com/a/26803579/2006048

It appears that I needed to change the last block of code to simply this:

addBlock.Post(4);
addBlock.Complete();
await addBlock.Completion;

The original code did not work because Complete() was called on each block before data could propagate, so it was a case of a race condition.

However, with this new edited code, it's calling Complete() on addBlock and awaits for its completion. This makes the program work as intended, but leaves me yet more confused. Why is it that Completion must be awaited from the addBlock and not from the last block in the chain, which is divBlock? I would think that Completion() is only called on addBlock because PropagationCompletion is set to true, but then I would think that we would wait for completion of the last block, not the first one.

If I await for the completion of mulBlock, then only the results of addBlock get printed. If I await for the completion of subBlock, the results of addBlock and mulBlock get printed. If I await for completion of divBlock, the results of addBlock, mulBlock and subBlock get printed.

I was basing my code on Stephen Cleary's Concurrency in C# Cookbook example (Section 4.1 Linking Blocks (Page 48)):

var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var subtractBlock = new TransformBlock<int, int>(item => item - 2);

var options = new DataflowLinkOptions { PropagateCompletion = true };
multiplyBlock.LinkTo(subtractBlock, options);

...

// The first block's completion is automatically propagated to the second block.
multiplyBlock.Complete();
await subtractBlock.Completion;

When I setup Cleary's code to match what I have, the same behavior is exhibited. Program prints result and terminates only when I await for multiplyBlock.Completion.

Community
  • 1
  • 1
B.K.
  • 9,982
  • 10
  • 73
  • 105

2 Answers2

3

The problem is that a block completes only after all its queues are emptied, which includes the output queue. What happens in your case is that the completion propagates correctly, but then divBlock gets stuck in the "almost complete" mode, waiting for the item in its output queue to be removed.

To solve this, you can either change divBlock to be an ActionBlock, or you can link it to a DataflowBlock.NullTarget<int>().

svick
  • 236,525
  • 50
  • 385
  • 514
  • That worked. However, why is it that if I await for `Completion` on `addBlock`, it all works fine in my code? Additionally, why is it that if I await for `Completion` on either `mulBlock` or `subBlock` the program hangs? They both empty their queues into the next block, don't they? It seems they should have the same behavior as the `addBlock`, yet in my code only `Completion` on `addBlock` worked. Lastly, with your suggestion, while the code works, I can await `Completion` on any of the blocks for it to work, not just `divBlock` -- why is that so? – B.K. Aug 07 '16 at 17:03
  • Hmm, interesting. If I await for `Completion` on any block other than `divBlock`, there are a few times when not all results get printed. So, there's some sort of race condition for results to get printed before the program terminates and it works 99.9% of the time. I guess that would make sense. Although, I'm still confused in regards to the difference in behavior of `addBlock` versus `mulBlock` and `subBlock` in my code prior to your suggestion. – B.K. Aug 07 '16 at 17:22
  • @B.K. Yeah, if you `await` `Completion`, the code will not continue as soon as the task completes, there is some scheduling delay, which causes that race condition. And I don't see a hang when i `await` `mulBlock.Completion`. – svick Aug 07 '16 at 17:49
  • Hmm, it appears that hang is intermittent as well, as I can't get again. What's more, is that all my results get printed from awaiting any block, unlike what I was experiencing yesterday. I had some load going in the background on the system yesterday (updating some stuff), so perhaps I was invoking the bad side of race condition. *That's with the old code, without your modification. So, the behavior is unreliable and the proper thing is to stick to the new code, which has your suggestions implemented. – B.K. Aug 07 '16 at 18:11
  • It looks like calling `Receive()` on `divBlock` before awaiting its `Completion` works as well. – B.K. Aug 07 '16 at 22:29
-1

You should use lastnode.ReceiveAllAsync()