I have a sequential pipeline that consists of two steps.
(simplified example)
The first step simply adds 1000 to the input number. The second step simply displays the number.
var transformBlock = new TransformBlock<int, long>(StepOne, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = DataflowBlockOptions.Unbounded,
});
var actionBlock = new ActionBlock<long>(StepTwo, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2,
});
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
for (int i = 0; i < 100; i++)
{
transformBlock.Post(i);
}
static async Task<long> StepOne(int item)
{
await Task.Delay(500);
Console.WriteLine("transforming: " + item);
return (long)item + 1000;
}
static async Task StepTwo(long item)
{
await Task.Delay(1000);
Console.WriteLine("final product: " + item);
}
Since step 2 is taking longer than step 1, I would expect step 1 to throttle after some time since it cannot send the result to the bounded buffer of step 2.
Expected output:
Transforming: 0
Transforming: 1
Final product: 1000
Transforming: 2
Final product: 1001
Transforming: 3
Final product: 1002
Transforming: 4
Final product: 1003
...
Actual output:
Transforming: 0
Transforming: 1
Final product: 1000
Transforming: 2
Transforming: 3
Final product: 1001
Transforming: 4
Transforming: 5
Final product: 1002
Transforming: 6
Transforming: 7
Final product: 1003
...