3

I have a TPL Dataflow pipeline where a target block is linked to two propagating blocks, which are then both linked to a source block. All are linked with PropagateCompletion = true. The first propagating block is linked with a filter accepting only even numbers, where the second accepts all the remaining messages.

After posting the last message, I set the first block to completed. There seems to be a race condition though. The final block seems to sometimes handle all of the values, but sometimes only the values that were accepted by the first propagating block and only a part of the values that were accepted by the second propagating block.

I feel there's a race condition. But I have no clue how to properly instruct the final source block that everything is completed, only after both of the propagating blocks that are linked to it, forwarded all of their messages.

Here's my code stripped down to a simple example:

    internal static class Program
    {
        public static async Task Main(string[] args)
        {
            var linkOptions = new DataflowLinkOptions
            {
                PropagateCompletion = true
            };
            var bufferBlock = new BufferBlock<int>();
            var fork1 = new TransformBlock<int, int>(n => n);
            var fork2 = new TransformBlock<int, int>(n =>
            {
                Thread.Sleep(100);
                return n;
            });
            var printBlock = new ActionBlock<int>(Console.WriteLine);

            bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
            bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
            
            fork1.LinkTo(printBlock, linkOptions);
            fork2.LinkTo(printBlock, linkOptions);
            
            for (var n = 1; n <= 10; ++n)
            {
                bufferBlock.Post(n);
            }
            bufferBlock.Complete();

            await printBlock.Completion;
        }
    }

This outputs:

2
4
6
8
10

And I want it to output:

2
4
6
8
10
1
3
5
7
9
Tohnmeister
  • 468
  • 1
  • 5
  • 14

3 Answers3

2

There is a diamond in your dataflow graph that causes the completion to propagate faster by either of two branches making the final block complete prematurely.

The completion of the last block can be customized using a task continuation:

          ...
            var printBlock = new ActionBlock<int>(Console.WriteLine);

            bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
            bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
            
            fork1.LinkTo(printBlock); // no completion propagation
            fork2.LinkTo(printBlock);
           
            Task.WhenAll(fork1.Completion, fork2.Completion)
               .ContinueWith(t => printBlock.Complete(), 
                   CancellationToken.None, 
                   TaskContinuationOptions.ExecuteSynchronously, 
                   TaskScheduler.Default);

            for (var n = 1; n <= 10; ++n)
            {
                bufferBlock.Post(n);
            }

            bufferBlock.Complete();

            await printBlock.Completion;

alexm
  • 6,854
  • 20
  • 24
  • Thanks! This is exactly it. Already managed to find it in a Gist and leading that Gist back to a similar question, differently phrased. Nevertheless, I will mark your answer as the solution. – Tohnmeister Jul 14 '22 at 09:23
  • 1
    One small niggle: You should assign the result of the `Task.WhenAll()` to a task that you can `await` at the end of the method, to ensure that any exceptions are observed. – Matthew Watson Jul 14 '22 at 09:42
  • @MatthewWatson in this particular case there is not much value at keeping a reference to the `ContinueWith` task. In case this task fails, the end of the method will never be reached, because the `printBlock.Completion` will never complete. That's why I prefer to handle these critical completion propagations inside `async void` methods, because in the (extremely unlikely) case of failure I prefer my application to crash instead of hang. – Theodor Zoulias Jul 14 '22 at 11:31
  • alexm this usage of `ContinueWith` violates this guideline: [Do not create tasks without passing a TaskScheduler](https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca2008). – Theodor Zoulias Jul 14 '22 at 11:35
0

Answering myself because I found the answer in another question here.

The problem is that fork1 and fork2 are both linked to printBlock with PropagateCompletion = true. And even though some messages are skipped by fork1, it still propagates completion to printBlock, causing printBlock to be completed before fork2 has handled any or all of the messages.

Solution is to replace

fork1.LinkTo(printBlock, linkOptions);
fork2.LinkTo(printBlock, linkOptions);

with

fork1.LinkTo(printBlock);
fork2.LinkTo(printBlock);

Task.WhenAll(fork1.Completion, fork2.Completion).ContinueWith(_ => printBlock.Complete());
Tohnmeister
  • 468
  • 1
  • 5
  • 14
0

Changing PropagateCompletion to false will fix it.

var linkOptions = new DataflowLinkOptions
{
    PropagateCompletion = false
};

This way fork 1 completing won't stop the print block and you can manually complete it at the end.

YungDeiza
  • 3,128
  • 1
  • 7
  • 32