21

Using Dataflow CTP (in the TPL)

Is there a way to call BatchBlock.TriggerBatch automatically if the number of currently queued or postponed items is less than the BatchSize, after a timeout?

And better: this timeout should be reset to 0 each time the block receives a new item.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Softlion
  • 12,281
  • 11
  • 58
  • 88

5 Answers5

28

Yes, you can accomplish this rather elegantly by chaining together blocks. In this case you want to setup a TransformBlock which you link "before" the BatchBlock. That would look something like this:

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch());

TransformBlock<T, T> timeoutTransformBlock = new TransformBlock<T, T>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

timeoutTransformBlock.LinkTo(yourBatchBlock);

yourBufferBlock.LinkTo(timeoutTransformBlock);
Drew Marsh
  • 33,111
  • 3
  • 82
  • 100
8

Here is a policed version of Drew Marsh's idea. This implementation uses the DataflowBlock.Encapsulate method to create a dataflow block that encapsulates the timer+batch functionality. Beyond the new argument timeout, the CreateBatchBlock method also supports all options available to the normal BatchBlock constructor.

public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
    int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
    dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
    var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
    var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
    var transformBlock = new TransformBlock<T, T>((T value) =>
    {
        timer.Change(timeout, Timeout.Infinite);
        return value;
    }, new ExecutionDataflowBlockOptions()
    {
        BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
        CancellationToken = dataflowBlockOptions.CancellationToken,
        EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
        MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
        NameFormat = dataflowBlockOptions.NameFormat,
        TaskScheduler = dataflowBlockOptions.TaskScheduler
    });
    transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
    {
        PropagateCompletion = true
    });
    return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}

Caution: As pointed by @Jeff in a comment, there is a race condition with this approach. In case the timeout is very small (in the range of milliseconds), the transformBlock will be racing with the timer to pass the data onto the batchBlock, and the timer may fire first before the batchBlock has anything in it yet. Worst case scenario, we hang indefinitely. No more messages end up in the queue because they are waiting on a handful of previous ones to complete, but there is one straggler sitting in the latest buffer that will never trigger.


Alternative: below is a BatchUntilInactiveBlock<T> class that offers the whole range of the BatchBlock<T> functionality. This implementation is a thin wrapper around a BatchBlock<T> instance. It has less overhead than the previous CreateBatchBlock implementation, while having a similar behavior. It's not affected by the race condition mentioned earlier.

/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to BatchSize, or when a Timeout period has elapsed after receiving the last item.
/// </summary>
public class BatchUntilInactiveBlock<T> : IPropagatorBlock<T, T[]>,
    IReceivableSourceBlock<T[]>
{
    private readonly BatchBlock<T> _source;
    private readonly Timer _timer;
    private readonly TimeSpan _timeout;

    public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout,
        GroupingDataflowBlockOptions dataflowBlockOptions)
    {
        _source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
        _timer = new Timer(_ => _source.TriggerBatch());
        _timeout = timeout;
    }

    public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout) : this(batchSize,
        timeout, new GroupingDataflowBlockOptions())
    { }

    public int BatchSize => _source.BatchSize;
    public TimeSpan Timeout => _timeout;
    public Task Completion => _source.Completion;
    public int OutputCount => _source.OutputCount;

    public void Complete() => _source.Complete();

    void IDataflowBlock.Fault(Exception exception)
        => ((IDataflowBlock)_source).Fault(exception);

    public IDisposable LinkTo(ITargetBlock<T[]> target,
        DataflowLinkOptions linkOptions)
            => _source.LinkTo(target, linkOptions);

    public void TriggerBatch() => _source.TriggerBatch();

    public bool TryReceive(Predicate<T[]> filter, out T[] item)
        => _source.TryReceive(filter, out item);

    public bool TryReceiveAll(out IList<T[]> items)
        => _source.TryReceiveAll(out items);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
        bool consumeToAccept)
    {
        var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
            messageValue, source, consumeToAccept);
        if (offerResult == DataflowMessageStatus.Accepted)
            _timer.Change(_timeout, System.Threading.Timeout.InfiniteTimeSpan);
        return offerResult;
    }

    T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target, out bool messageConsumed)
            => ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
                target, out messageConsumed);

    bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);

    void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}

The _timer is scheduled immediately after the BatchBlock<T> has been offered and has accepted a message. There is no window of time between scheduling the timer and offering the message, so there is no race.


Disclaimer: The behavior of the above implementations is not ideal, in that they produce short batches even in cases that they shouldn't. The ideal behavior would be to produce a short batch only in case the batch can be propagated instantly to a consumer downstream. Producing short batches and just storing them in the block's output buffer doesn't make much sense. This deviation from the ideal behavior can only be observed if the CreateBatchBlock<T>/BatchUntilInactiveBlock<T> is not rigorously pumped, for example if the linked block downstream is bounded and has reached its maximum capacity.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Amazing of you to come and add the alternative implementation 18 months later! I'm guessing this means you have probably been using this yourself for a while... do you also happen to have already written one that uses the "release messages after at most x time" semantics, rather than the "reset if more messages arrive within x time" semantics? I'm not confident I understand the internal plumbing of the interfaces well enough to dare try it myself with your wrapper approach rather than the encapsulate approach (which is what I am using now). – allmhuran Sep 11 '21 at 06:25
  • @allmhuran actually someone upvoted my answer, so I gave it a fresh look searching for improvements. :-) Regarding a custom `BatchBlock` that resets its timer when the **first** item in a batch is received, I found this question: [TPL DataFlow - Batching on duration or threshold](https://stackoverflow.com/questions/52633346/tpl-dataflow-batching-on-duration-or-threshold). It has some answers by knowledgeable StackOverflow members. I may try to improve on these answers when I have time. – Theodor Zoulias Sep 11 '21 at 08:27
  • 1
    Haha yes, that was me. I have indeed seen that thread, but the completely self contained, "dataflow-pure" solutions you have here are the best on the net (MHO). The encapsulation approach works nicely and I am entirely happy with it, whatever additional overhead it has is not even close to being a bottleneck for my particular use. In fact aesthetically I still prefer it, the lego approach is the natural fit. – allmhuran Sep 11 '21 at 14:37
  • 1
    @allmhuran yes, the `DataflowBlock.Encapsulate` approach is convenient and usually much sorter than implementing the `IPropagatorBlock` interface directly. On the other hand including additional functionality, beyond what the `IPropagatorBlock` interface offers, becomes very awkward (you must use `out Action` parameters or something). Also having the `IReceivableSourceBlock` interface available is handy sometimes. For example it allows to convert a dataflow block to an `IAsyncEnumerable` sequence, as shown [here](https://stackoverflow.com/a/62410007/11178549). – Theodor Zoulias Sep 11 '21 at 18:19
  • 1
    @allmhuran I changed the name of the class to `BatchUntilInactiveBlock`, because the original name (`TimeoutBatchBlock`) is more suitable IMHO for the [other behavior](https://stackoverflow.com/questions/52633346/tpl-dataflow-batching-on-duration-or-threshold/69148085#69148085) (activating the timer when the first item arrives, not the last). – Theodor Zoulias Sep 12 '21 at 03:06
  • 1
    I am pleased with my decision not to attempt that myself. Yikes! I knew there'd be something tricky ;) – allmhuran Sep 12 '21 at 03:10
  • @allmhuran yeap, the TPL Dataflow is not easily extensible. It needs a lot of code to do simple stuff. That's the main weakness of this library IMHO. – Theodor Zoulias Sep 12 '21 at 03:13
  • 1
    Hi @allmhuran. A correction. As I learned today, the `DataflowBlock.Encapsulate` returns an `IPropagatorBlock` implementation that also implements the `IReceivableSourceBlock` interface ([source code](https://source.dot.net/System.Threading.Tasks.Dataflow/Base/DataflowBlock.cs.html#a07470af1dcf41bc)). It's not obvious, but if you cast it `((IReceivableSourceBlock)encapsulatedBlock)` the cast will succeed. This makes the `DataflowBlock.Encapsulate` approach a bit more appealing, since it eliminates one of its presumed disadvantages. – Theodor Zoulias Sep 27 '21 at 19:57
  • 1
    Very admirable of you to come back and keep improving the information on this (and other) topics! This is the only time I have ever looked at using `Encapsulate` for anything and so having not really explored it thoroughly at all I absolutely did not realise that. Useful for sure. – allmhuran Sep 27 '21 at 20:38
  • Won't the "easier" approach be susceptible to a race condition where a batch never gets processed after timeout? The TransformBlock will be racing with the timer to pass the data onto the BufferBlock and the Timer may ellapse first but the BufferBlock will have nothing in it yet. – Jeff Apr 30 '22 at 17:11
  • @Jeff you mean if the `timeout` is very small, in the range of milliseconds? Yea, there might be a race condition in this case. Worst case scenario: the timeout policy will be violated. Eventually all messages will be processed though, when the block completes. My opinion is, for timeout greater than one second, both implementations are fine. For timeout less than one second, prefer the second implementation to be on the safe side. – Theodor Zoulias Apr 30 '22 at 17:58
  • worst case actually, we hang indefinitely... no more messages end up in the queue because they are waiting on a handful of previous ones to complete, but there is one straggler sitting in the latest buffer that will never trigger. – Jeff Apr 30 '22 at 18:07
  • 1
    but yes, in the range of 30 milliseconds - the scenario is an automatic (read middleware) HTTP request batching implementation for MS Graph API calls that uses their $batch endpoint to batch HTTP requests. A delegating handler adds to the buffer block and then batches any messages (up to 15 or within 50ms of adding to the block) to the $batch endpoint, then returns the HTTP Responses to the original delegating handlers. I never want to incur more than 30 milliseconds of performance hit to the request to reap the benefit of batching. The second implementation is safe because it offers first – Jeff Apr 30 '22 at 18:09
4

Thanks to Drew Marsh for the idea of using a TransformBlock which greatly helped me with a recent solution. However, I believe that the timer needs to be reset AFTER the batch block (i.e. after it has either been triggered by the batch size being reached OR the TriggerBatch method being explicitly called within the timer callback). If you reset the timer every time you get a single item then it can potentially keep resetting several times without actually triggering a batch at all (constantly pushing the "dueTime" on the Timer further away).

This would make the code snippet look like the following:

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch(), null, 5000, Timeout.Infinite);

TransformBlock<T[], T[]> timeoutTransformBlock = new TransformBlock<T[], T[]>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

yourBufferBlock.LinkTo(yourBatchBlock);
yourBatchBlock.LinkTo(timeoutTransformBlock)
timeoutTransformBlock.LinkTo(yourActionBlock);

// Start the producer which is populating the BufferBlock etc.
1

Here is a solution which builds upon previous answers. This method encapsulates an exisiting BatchBlock with one the pushes out batches at least as ofter as the timeout.

The other answers doesn't handle if there are no items is the batch block when the timer is triggered. In that case the other solutions waits until the batch is full. We had that issue in our non-production environments which made testing harder. This solution make sure that after an item is posted to the BatchBlock, it is propagated after at most timeout seconds.

public static IPropagatorBlock<T, T[]> CreateTimeoutBatchBlock<T>(BatchBlock<T> batchBlock, int timeout)
{
    var timespan = TimeSpan.FromSeconds(timeout);
    var timer = new Timer(
        _ => batchBlock.TriggerBatch(),
        null,
        timespan,
        timespan);
    var transformBlock = new TransformBlock<T[], T[]>(
        value =>
        {
            // Reset the timer when a batch has been triggered
            timer.Change(timespan, timespan);
            return value;
        });
    batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
    return DataflowBlock.Encapsulate(batchBlock, transformBlock);
}
wanton
  • 271
  • 1
  • 9
  • This is indeed safe from race conditions, but it has the downside that the timer is firing repeatedly, even when the pipeline is idle. To be honest I have started questioning if the TPL Dataflow is a robust tool for this kind of work (or any kind of work for that matter). Its components are too loosely connected with each other. – Theodor Zoulias Sep 01 '23 at 13:38
-1

You can use link options

_transformManyBlock.LinkTo(_batchBlock, new DataflowLinkOptions {PropagateCompletion = true});