1

I have a sequential pipeline that consists of two steps.

(simplified example)

The first step simply adds 1000 to the input number. The second step simply displays the number.

var transformBlock = new TransformBlock<int, long>(StepOne, new ExecutionDataflowBlockOptions
{
      MaxDegreeOfParallelism = 1,
      BoundedCapacity = DataflowBlockOptions.Unbounded,
});
var actionBlock = new ActionBlock<long>(StepTwo, new ExecutionDataflowBlockOptions
{
      MaxDegreeOfParallelism = 1,
      BoundedCapacity = 2,
});
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
    PropagateCompletion = true
});

for (int i = 0; i < 100; i++)
{
    transformBlock.Post(i);
}

static async Task<long> StepOne(int item)
{
    await Task.Delay(500);
    Console.WriteLine("transforming: " + item);
    return (long)item + 1000;
}

static async Task StepTwo(long item)
{
    await Task.Delay(1000);
    Console.WriteLine("final product: " + item);
}

Since step 2 is taking longer than step 1, I would expect step 1 to throttle after some time since it cannot send the result to the bounded buffer of step 2.

Expected output:
Transforming: 0
Transforming: 1
Final product: 1000
Transforming: 2
Final product: 1001
Transforming: 3
Final product: 1002
Transforming: 4
Final product: 1003
...

Actual output:
Transforming: 0
Transforming: 1
Final product: 1000
Transforming: 2
Transforming: 3
Final product: 1001
Transforming: 4
Transforming: 5
Final product: 1002
Transforming: 6
Transforming: 7
Final product: 1003
...

1 Answers1

1

A TransformBlock maintains two queues internally, an input queue and an output queue. The size of these two queues can be monitored at any moment through the InputCount and OutputCount properties. The accumulated size of these two queues is configured by the BoundedCapacity option, so the sum InputCount+OutputCount is always less than or equal to the BoundedCapacity value. In your case the BoundedCapacity of the block is Unbounded, so there is no limiting factor at how large these two queues can become (other than some hard limits like the Int32.MaxValue probably). The fact that the linked ActionBlock has a limited bounded capacity is mostly irrelevant, and has no consequence other than delaying the transfer of the transformed values from the output queue of the TransformBlock to the input queue of the ActionBlock. This consequence is only observable if you monitor the OutputCount property of the source block, and the InputCount property of the target block. It wouldn't even matter if the TransformBlock was not linked to any target block. It would happily continue crunching numbers by itself, until some hard limit was hit, or the memory of the machine was exhausted.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • That makes a lot of sense. Would you happen to know what blocks I have to chain in order to achieve my desired output? – user3357878 Jul 10 '21 at 19:10
  • @user3357878 my understanding is that it only makes sense to configure with `BoundedCapacity` all blocks in a pipeline, or none. If all blocks are bounded then [backpreasure](https://stackoverflow.com/questions/40340274/tpl-dataflow-very-fast-producer-not-so-fast-consumers-outofmemory-exception) may be introduced, which means that the feeder of the pipeline may be blocked. The `transformBlock.Post(i);` is no longer guaranteed to return `true`, which usually means that you have to switch to the asynchronous `await transformBlock.SendAsync(i);`. – Theodor Zoulias Jul 10 '21 at 19:22
  • 1
    Limiting every block to the same capacity and using SendAsync (in order to not throw away items) would indeed solve this. However imagine my first step enqueues some lightweight items (e.g. an ID), while the second step downloads an image. This means I wouldn't care about the backpressure caused by the several thousand integers enqueued in the first step and make that a reasonable use case? – user3357878 Jul 10 '21 at 19:33
  • 1
    @user3357878 yeap, this is certainly an interesting use case. Essentially you need a `TransformBlock` with a separate `BoundedCapacity` configuration for each of its input and output queues. AFAIK there is no such built-in block, but you could consider adding a `BufferBlock` at the start of the pipeline, with unlimited capacity, and limit the capacity of all linked blocks downstream. – Theodor Zoulias Jul 10 '21 at 20:30