1

I'm using TPL DataFlow with BatchBlock which is triggered automatically when the number of currently queued or postponed items is less than the BatchSize after a timeout as given below :

Timer triggerBatchTimer = new(_ => _batchBlock.TriggerBatch());

TransformBlock<string, string> timeoutTransformBlock = new((value) =>
{
    triggerBatchTimer.Change(_options.Value.TriggerBatchTime, Timeout.Infinite);
    return value;
});

var actionBlock = new ActionBlock<IEnumerable<string>>(action =>
{
    GenerateFile(action);
});

_buffer.LinkTo(timeoutTransformBlock);
timeoutTransformBlock.LinkTo(_batchBlock);
_batchBlock.LinkTo(actionBlock);

Ie with max batch size : 4 and timeout 10sec

Current behavior
BatchBlock (items): +---------1------------2------3---------------------------------|
Timeout (sec)     : +-10--9--10--9--8--7--10--9--10--9--8--7--6--5--4--3--2--1--0---|
ActionBlock       : +----------------------------------------------------------Call-|

Expected
BatchBlock (items): +--------1-----------2-----3---------|
Timeout (sec)     : +-10--9--8--7--6--5--4--3--2--1--0---|
ActionBlock       : +-------------------------------Call-|

But my problem is how to avoid timeout to be reset to 0 each time the block receive a new item ?

Shipra Sarkar
  • 1,385
  • 3
  • 10
Julien Martin
  • 197
  • 2
  • 15
  • Related: [Batching on duration or threshold using TPL Dataflow](https://stackoverflow.com/questions/52633346/batching-on-duration-or-threshold-using-tpl-dataflow) – Theodor Zoulias Nov 18 '22 at 08:41
  • *"My problem is how to avoid timeout to be reset to 0 each time the block receive a new item."* -- So at what point and conditions do you want to reset the `Timer`? Or, if this question is unclear, what exactly is the behavior that you want to enforce? – Theodor Zoulias Nov 18 '22 at 08:47
  • When the `Timer` is initiate, for each time the block receive an item the timer should not change the start time only when `GenerateFile` is call. Ie : timeout start at 30 sec, block receive first item and timeout not restart at 30 sec I don't know if I'm clear... – Julien Martin Nov 18 '22 at 09:26
  • Julien to be honest your explanation it's not clear to me. Maybe adding a [marble diagram](https://stackoverflow.com/questions/64841312/how-to-merge-multiple-observables-with-order-preservation-and-maximum-concurrenc) in the question would help at clarifying the desirable behavior. – Theodor Zoulias Nov 18 '22 at 09:46
  • Do you get the desirable behavior if, instead of calling `triggerBatchTimer.Change`, you just initialize the timer with the same value for both `dueTime` and `period`? This way batches will be emitted periodically, or faster if the `batchSize` has been reached. – Theodor Zoulias Nov 18 '22 at 10:10
  • Just initialized batchblock like this : `Timer triggerBatchTimer = new(_ => _batchBlock.TriggerBatch(), null, 10000, 10000);` and remove `triggerBatchTimer.Change` ? – Julien Martin Nov 18 '22 at 11:07
  • Yep. Do you get the desirable behavior? – Theodor Zoulias Nov 18 '22 at 11:09
  • Yes, I implemented one test for the moment and it's ok. I will do severals tests and I'll come back to you for result. – Julien Martin Nov 18 '22 at 11:15
  • In case you have found the solution, you could consider posting it as a [self-answer](https://stackoverflow.com/help/self-answer). To be honest the marble diagram inside the question, the one titled "Expected", doesn't have enough length to make the desirable pattern unambiguous, so anyone else trying to answer the question will be a hit-or-miss. – Theodor Zoulias Nov 18 '22 at 12:23

1 Answers1

1

Thanks to @Theodor Zoulias for help, this is solution :

Just initialize the timer with the same value for both dueTime and period and remove triggerBatchTimer.Change in TransformBlock. Thus batchblock fires every X seconds or when the batchSize has been reached and the timer is not reset for each new item in the batchblock.

Timer triggerBatchTimer = new(_ => _batchBlock.TriggerBatch(), null, options.Value.TriggerBatchTime, options.Value.TriggerBatchTime);

TransformBlock<string, string> timeoutTransformBlock = new((value) =>
{
    return value;
});

var actionBlock = new ActionBlock<IEnumerable<string>>(action =>
{
    GenerateFile(action);
});

_buffer.LinkTo(timeoutTransformBlock);
timeoutTransformBlock.LinkTo(_batchBlock);
_batchBlock.LinkTo(actionBlock);
Julien Martin
  • 197
  • 2
  • 15