4

Given a BroadcastBlock with a message in the buffer, is it possible to prevent that message from being sent to newly linked targets? For example:

static void Main(string[] args)
{
    var myBroadcastBlock = new BroadcastBlock<string>(msg => msg);
    var myActionBlock = new ActionBlock<string>(msg => Console.WriteLine(msg));

    myBroadcastBlock.Post("Hello World!"); // No linked targets here.

    myBroadcastBlock.LinkTo(myActionBlock); // Link a target.

    // etc.
}

This code will print "Hello World". Basically, the BroadcastBlock will still send the buffered message to the ActionBlock on .LinkTo, despite the message having been posted prior to the link being established.

Is there a built-in way to prevent this behavior? I only want messages to be sent to current links, not future ones.

I am using System.Threading.Tasks.Dataflow 4.11.1

Peter Csala
  • 17,736
  • 16
  • 35
  • 75
Jacob Shanley
  • 893
  • 1
  • 8
  • 19
  • 1
    According to my understanding each `DataFlowBlock` maintains an queue, which size can be controlled by the `BoundedCapacity`. The only exception that I'm aware of is the `WriteOnceBlock`. You can't control this behaviour even through a `DataflowLinkOptions` instance during linking. – Peter Csala Jul 30 '20 at 07:07
  • 2
    The `LinkTo()` method appears to call `OfferCurrentMessageToNewTarget(target)` internally. So I don't think you can prevent this behavior. You would need to rebuild your pipeline. Use the Complete method on the existing BroadcastBlock (and wait for completion) then re-instantiate it as new, and then link all required blocks. – SeanOB Jul 30 '20 at 07:41
  • @SeanOB Yeah this is what I was afraid of :-/ – Jacob Shanley Jul 30 '20 at 12:27

1 Answers1

1

This behavior is not possible using the built-in BroadcastBlock class. Its behavior is not configurable. If you desperately need this behavior, you could try the implementation below. It uses an internal BroadcastBlock<(T, long)> with an index that is incremented with each new message, so that during linking the currently active message can be filtered out.

There is quite a lot of indirection inside the BroadcastBlockNewOnly class, because of the need to translate from T to (T, long) and back to T. This makes the class hard to maintain, and also not very efficient. On every received message a new object is allocated, creating more work for the garbage collector, so use this class with caution.

public class BroadcastBlockNewOnly<T> : ITargetBlock<T>, ISourceBlock<T>
{
    private readonly IPropagatorBlock<(T, long), (T, long)> _broadcastBlock;
    private long _index;

    public BroadcastBlockNewOnly(Func<T, T> cloningFunction,
        DataflowBlockOptions dataflowBlockOptions = null)
    {
        if (cloningFunction == null)
            throw new ArgumentNullException(nameof(cloningFunction));
        _broadcastBlock = new BroadcastBlock<(T, long)>(entry =>
        {
            var (value, index) = entry;
            return (cloningFunction(value), index);
        }, dataflowBlockOptions ?? new DataflowBlockOptions());
    }

    public Task Completion => _broadcastBlock.Completion;
    public void Complete() => _broadcastBlock.Complete();
    void IDataflowBlock.Fault(Exception ex) => _broadcastBlock.Fault(ex);

    public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
    {
        if (target == null) throw new ArgumentNullException(nameof(target));
        var currentIndex = Interlocked.CompareExchange(ref _index, 0, 0);
        var linkedTargetProxy = new LinkedTargetProxy(target, this, currentIndex);
        return _broadcastBlock.LinkTo(linkedTargetProxy, linkOptions);
    }

    private long GetNewIndex() => Interlocked.Increment(ref _index);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
        T value, ISourceBlock<T> source, bool consumeToAccept)
    {
        var sourceProxy = source != null ?
            new SourceProxy(source, this, GetNewIndex) : null;
        return _broadcastBlock.OfferMessage(header, (value, GetNewIndex()),
            sourceProxy, consumeToAccept);
    }

    T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader header,
        ITargetBlock<T> target, out bool messageConsumed)
    {
        var targetProxy = target != null ? new TargetProxy(target, this) : null;
        var (value, index) = _broadcastBlock.ConsumeMessage(header, targetProxy,
            out messageConsumed);
        return value;
    }

    bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader header,
        ITargetBlock<T> target)
    {
        var targetProxy = target != null ? new TargetProxy(target, this) : null;
        return _broadcastBlock.ReserveMessage(header, targetProxy);
    }

    void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader header,
        ITargetBlock<T> target)
    {
        var targetProxy = target != null ? new TargetProxy(target, this) : null;
        _broadcastBlock.ReleaseReservation(header, targetProxy);
    }

    private class LinkedTargetProxy : ITargetBlock<(T, long)>
    {
        private readonly ITargetBlock<T> _realTarget;
        private readonly ISourceBlock<T> _realSource;
        private readonly long _indexLimit;

        public LinkedTargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource,
            long indexLimit)
        {
            _realTarget = realTarget;
            _realSource = realSource;
            _indexLimit = indexLimit;
        }

        DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
            DataflowMessageHeader header, (T, long) messageValue,
            ISourceBlock<(T, long)> source, bool consumeToAccept)
        {
            var (value, index) = messageValue;
            if (index <= _indexLimit) return DataflowMessageStatus.Declined;
            return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
        }

        Task IDataflowBlock.Completion => throw new NotSupportedException();
        void IDataflowBlock.Complete() => _realTarget.Complete();
        void IDataflowBlock.Fault(Exception ex) => _realTarget.Fault(ex);
    }

    private class SourceProxy : ISourceBlock<(T, long)>
    {
        private readonly ISourceBlock<T> _realSource;
        private readonly ITargetBlock<T> _realTarget;
        private readonly Func<long> _getNewIndex;

        public SourceProxy(ISourceBlock<T> realSource, ITargetBlock<T> realTarget,
            Func<long> getNewIndex)
        {
            _realSource = realSource;
            _realTarget = realTarget;
            _getNewIndex = getNewIndex;
        }

        (T, long) ISourceBlock<(T, long)>.ConsumeMessage(DataflowMessageHeader header,
            ITargetBlock<(T, long)> target, out bool messageConsumed)
        {
            var value = _realSource.ConsumeMessage(header, _realTarget,
                out messageConsumed);
            var newIndex = _getNewIndex();
            return (value, newIndex);
        }

        bool ISourceBlock<(T, long)>.ReserveMessage(DataflowMessageHeader header,
            ITargetBlock<(T, long)> target)
        {
            return _realSource.ReserveMessage(header, _realTarget);
        }

        void ISourceBlock<(T, long)>.ReleaseReservation(DataflowMessageHeader header,
            ITargetBlock<(T, long)> target)
        {
            _realSource.ReleaseReservation(header, _realTarget);
        }

        Task IDataflowBlock.Completion => throw new NotSupportedException();
        void IDataflowBlock.Complete() => throw new NotSupportedException();
        void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
        IDisposable ISourceBlock<(T, long)>.LinkTo(ITargetBlock<(T, long)> target,
            DataflowLinkOptions linkOptions) => throw new NotSupportedException();
    }

    private class TargetProxy : ITargetBlock<(T, long)>
    {
        private readonly ITargetBlock<T> _realTarget;
        private readonly ISourceBlock<T> _realSource;

        public TargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource)
        {
            _realTarget = realTarget;
            _realSource = realSource;
        }

        DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
            DataflowMessageHeader header, (T, long) messageValue,
            ISourceBlock<(T, long)> source, bool consumeToAccept)
        {
            var (value, index) = messageValue;
            return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
        }

        Task IDataflowBlock.Completion => throw new NotSupportedException();
        void IDataflowBlock.Complete() => throw new NotSupportedException();
        void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
    }

}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • This is a smart solution and has the behavior I am looking for—thanks! Given the warranted cautionary tale about the allocation of new objects and general class maintenance, I'm wondering if there is a more streamlined workaround out there? I'm optimistic, but seems unlikely, given all the hoops you had to go through to create a new type of object with the desired behavior. – Jacob Shanley Jul 30 '20 at 13:00
  • 1
    @JacobShanley this is the simplest solution I could think of. You could probably make it less allocatey by maintaining internally a cache of `SourceProxy` and `TargetProxy` instances, but this would complicate the class even further. I think that if you don't pass through the pipeline thousands of messages per second, and don't have a specific reason to keep the garbage collector idle (like programming a game), the current implementation should be good enough. – Theodor Zoulias Jul 30 '20 at 13:15
  • 1
    @JacobShanley Sorry to say, but this seems to me a bit overkill. It looks like you want to use the wrong tool for this problem. An Observable might be a better fit for this. You can convert any DataFlowObject to Observable by calling the `AsObservable` ([1](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblock.asobservable?view=netcore-3.1)) – Peter Csala Jul 30 '20 at 15:05
  • @PeterCsala Yeah, Observable is a good call. Thanks! – Jacob Shanley Jul 30 '20 at 16:19
  • 2
    @PeterCsala the Rx equivalent of the `BroadcastBlock` is the [`BehaviorSubject`](https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh211949(v=vs.103)), which has a similar behavior when subscribed (it propagates the last value it received). The behavior requested by the OP is probably matched by a simple [`Subject`](https://learn.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh229173(v=vs.103)). – Theodor Zoulias Jul 30 '20 at 16:47