11

I have a stream of data that I process in several different ways... so I would like to send a copy of each message I get to multiple targets so that these targets may execute in parallel... however, I need to set BoundedCapacity on my blocks because the data is streamed in way faster than my targets can handle them and there is a ton of data. Without BoundedCapacity I would quickly run out of memory.

However the problem is BroadcastBlock will drop messages if a target cannot handle it (due to the BoundedCapacity).

What I need is a BroadcastBlock that will not drop messages, but will essentially refuse additional input until it can deliver messages to each target and then is ready for more.

Is there something like this, or has anybody written a custom block that behaves in this manner?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Brian Rice
  • 3,107
  • 1
  • 35
  • 53

2 Answers2

10

It is fairly simple to build what you're asking using ActionBlock and SendAsync(), something like:

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    IEnumerable<ITargetBlock<T>> targets)
{
    var targetsList = targets.ToList();

    return new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targetsList)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}

This is the most basic version, but extending it to support mutable list of targets, propagating completion or cloning function should be easy.

svick
  • 236,525
  • 50
  • 385
  • 514
  • If I put a BoundedCapacity larger than one I am losing messages... do I have to negotiate with the sender (or maybe the target) so this doesn't happen? – Brian Rice Mar 02 '14 at 19:12
  • I don't see how could this be losing messages, assuming you don't use normal `BroadcastBlock` anywhere. Could you post a sample code that demonstrates the issue somewhere? – svick Mar 02 '14 at 19:15
  • My fault, I had to add Completion.ContinueWith between the source and the ActionBlock and the ActionBlock and each target... I was still doing this between the source and the target skipping over the ActionBlock... and probably leaving things in the ActionBlock's buffer. – Brian Rice Mar 02 '14 at 22:47
  • So... I wrote code that essentially replaces a TransformBlock with an ActionBlock and SendAsync (as suggested)... when I run the code with a BufferBlock-TransformBlock-ActionBlock it takes 7 seconds and when I run it with a BufferBlock-ActionBlock/SendAsync-ActionBlock it takes 10 seconds... a bit of a performance hit... wish I had the experience to write a true GuaranteedBroadcastBlock. – Brian Rice Mar 03 '14 at 23:27
  • I've uploaded a VS 2012 project to http://www.brianrice.com/downloads/permanent/test2.zip... if anybody wants to take a peek at my results or pitch in to improve it! :) (see Form1.cs for comments) – Brian Rice Mar 03 '14 at 23:56
  • This performance hit is probably caused by some internal tasks created by `SendAsync` if it can't deliver the message at first time. Maybe you should adjust the settings for your blocks. – VMAtm Nov 16 '16 at 17:27
  • This question and both answers are a great demonstration of how Dataflow simplifies programming and how essential it is to understand CSP/Dataflow to avoid overengineering. – Panagiotis Kanavos Nov 23 '20 at 07:54
0

Here is a polished version of svick's idea. The GuaranteedDeliveryBroadcastBlock class below is an (almost) complete substitute of the built-in BroadcastBlock. Linking and unlinking targets at any moment is supported.

public class GuaranteedDeliveryBroadcastBlock<T> :
    ITargetBlock<T>, ISourceBlock<T>, IPropagatorBlock<T, T>
{
    private class Subscription
    {
        public readonly ITargetBlock<T> Target;
        public readonly bool PropagateCompletion;
        public readonly CancellationTokenSource CancellationSource;

        public Subscription(ITargetBlock<T> target,
            bool propagateCompletion,
            CancellationTokenSource cancellationSource)
        {
            Target = target;
            PropagateCompletion = propagateCompletion;
            CancellationSource = cancellationSource;
        }
    }

    private readonly object _locker = new object();
    private readonly Func<T, T> _cloningFunction;
    private readonly CancellationToken _cancellationToken;
    private readonly ITargetBlock<T> _actionBlock;
    private readonly List<Subscription> _subscriptions = new List<Subscription>();
    private readonly Task _completion;
    private CancellationTokenSource _faultCTS
        = new CancellationTokenSource(); // Is nullified on completion

    public GuaranteedDeliveryBroadcastBlock(Func<T, T> cloningFunction,
        DataflowBlockOptions dataflowBlockOptions = null)
    {
        _cloningFunction = cloningFunction
            ?? throw new ArgumentNullException(nameof(cloningFunction));
        dataflowBlockOptions ??= new DataflowBlockOptions();
        _cancellationToken = dataflowBlockOptions.CancellationToken;

        _actionBlock = new ActionBlock<T>(async item =>
        {
            Task sendAsyncToAll;
            lock (_locker)
            {
                var allSendAsyncTasks = _subscriptions
                    .Select(sub => sub.Target.SendAsync(
                        _cloningFunction(item), sub.CancellationSource.Token));
                sendAsyncToAll = Task.WhenAll(allSendAsyncTasks);
            }
            await sendAsyncToAll;
        }, new ExecutionDataflowBlockOptions()
        {
            CancellationToken = dataflowBlockOptions.CancellationToken,
            BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
            MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
            TaskScheduler = dataflowBlockOptions.TaskScheduler,
        });

        var afterCompletion = _actionBlock.Completion.ContinueWith(t =>
        {
            lock (_locker)
            {
                // PropagateCompletion
                foreach (var subscription in _subscriptions)
                {
                    if (subscription.PropagateCompletion)
                    {
                        if (t.IsFaulted)
                            subscription.Target.Fault(t.Exception);
                        else
                            subscription.Target.Complete();
                    }
                }
                // Cleanup
                foreach (var subscription in _subscriptions)
                {
                    subscription.CancellationSource.Dispose();
                }
                _subscriptions.Clear();
                _faultCTS.Dispose();
                _faultCTS = null; // Prevent future subscriptions to occur
            }
        }, TaskScheduler.Default);

        // Ensure that any exception in the continuation will be surfaced
        _completion = Task.WhenAll(_actionBlock.Completion, afterCompletion);
    }

    public Task Completion => _completion;

    public void Complete() => _actionBlock.Complete();

    void IDataflowBlock.Fault(Exception ex)
    {
        _actionBlock.Fault(ex);
        lock (_locker) _faultCTS?.Cancel();
    }

    public IDisposable LinkTo(ITargetBlock<T> target,
        DataflowLinkOptions linkOptions)
    {
        if (linkOptions.MaxMessages != DataflowBlockOptions.Unbounded)
            throw new NotSupportedException();
        Subscription subscription;
        lock (_locker)
        {
            if (_faultCTS == null) return new Unlinker(null); // Has completed
            var cancellationSource = CancellationTokenSource
                .CreateLinkedTokenSource(_cancellationToken, _faultCTS.Token);
            subscription = new Subscription(target,
                linkOptions.PropagateCompletion, cancellationSource);
            _subscriptions.Add(subscription);
        }
        return new Unlinker(() =>
        {
            lock (_locker)
            {
                // The subscription may have already been removed
                if (_subscriptions.Remove(subscription))
                {
                    subscription.CancellationSource.Cancel();
                    subscription.CancellationSource.Dispose();
                }
            }
        });
    }

    private class Unlinker : IDisposable
    {
        private readonly Action _action;
        public Unlinker(Action disposeAction) => _action = disposeAction;
        void IDisposable.Dispose() => _action?.Invoke();
    }

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue,
        ISourceBlock<T> source, bool consumeToAccept)
    {
        return _actionBlock.OfferMessage(messageHeader, messageValue, source,
            consumeToAccept);
    }

    T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T> target, out bool messageConsumed)
            => throw new NotSupportedException();

    bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T> target)
            => throw new NotSupportedException();

    void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T> target)
            => throw new NotSupportedException();
}

Missing features: the IReceivableSourceBlock<T> interface is not implemented, and linking with the MaxMessages option is not supported.

This class is thread-safe.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • This isn't what svick posted at all. this is far, far too complicated for a simple clone operation. Even the lock is unnecessary in the simple cloning loop - there's only a single worker task, so why would a lock be needed? to handle DOP>1? Why is DOP>1 needed? And even if it is, why use locks instead of eg Channels? The very reason Dataflow is used is to avoid locks – Panagiotis Kanavos Nov 23 '20 at 07:52
  • @PanagiotisKanavos the `GuaranteedDeliveryBroadcastBlock` is based on svick's idea of having a central `ActionBlock` that distributes messages to various targets. I just packaged this functionality in a nice `IPropagatorBlock` implementation, so that it can be used more easily. The `lock` is required in order to allow concurrent linking and unlinking of targets. I couldn't claim that *"This class is thread-safe."* at the end, without some sort of synchronization in place. And a lock is the simplest, and easiest to understand, tool for this job. – Theodor Zoulias Nov 23 '20 at 15:11
  • @PanagiotisKanavos btw the lock is used just to ensure the thread-safety of the class. It's not a contested lock, so it won't generate any contention. Its overhead should be negligible. The built-in TPL Dataflow blocks are also making use of locks internally, when they receive messages. – Theodor Zoulias Nov 23 '20 at 15:20