1

I'm passing to Actionblock instances of some class. If I call

cancellationSource.Cancel();

then processing will stop. But some instances can stay in input queue of ActionBlock. I need to gain access to that remaining instances in orger to release some resources.

How can I acheive this goal?

FunctorPrototype
  • 1,173
  • 2
  • 12
  • 24
  • 1
    Cancelling or faulting a block discards (dequeues) all messages. If the only reference to these messages existed in the block, the messages will be collected once the GC runs. What do you mean by "release some resources"? Are you trying to simply clear the queue or do the messages contain references to unmanaged resources? Are you storing large amounts of data in them perhaps? – Panagiotis Kanavos Jan 14 '16 at 17:36
  • I have some objects there (in action) that I want to return in some pool without reallocation. So now I think that is impossible – FunctorPrototype Jan 16 '16 at 15:27
  • That's a completely different issue that *isn't* related to whether the input queue is visible or not. Besides, treating messages as something more is a strong smell - why pool the *messages* instead of eg, the buffers or connections they contain? If you did, disposing of a message would also release the buffers – Panagiotis Kanavos Jan 18 '16 at 15:54
  • This is a case of [the XY problem](http://meta.stackexchange.com/questions/66377/what-is-the-xy-problem), having an issue with X, assuming Y is the solution and asking about Y instead of X when trouble arises. What *is* the original problem here? Reusing resources *or* gracefully and *cooperativley* discarding in-flight messages? In this case one answer would be to redirect all objects to a block that releases/disposes them – Panagiotis Kanavos Jan 18 '16 at 15:59

1 Answers1

1

If you desperately need an ActionBlock with an exposed input buffer, you could try the custom implementation below. It supports all the built-in functionality of an ActionBlock, including also a custom IEnumerable<T> InputQueue property. The input buffer is not emptied when the ActionBlockEx completes in a faulted or canceled state.

public class ActionBlockEx<T> : ITargetBlock<T>
{
    private readonly ITargetBlock<object> _actionBlock;
    private readonly Queue<T> _queue;

    public ActionBlockEx(Func<T, Task> action,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
    {
        if (action == null) throw new ArgumentNullException(nameof(action));
        _actionBlock = new ActionBlock<object>(_ =>
        {
            T item; lock (_queue) item = _queue.Dequeue();
            return action(item);
        }, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
        _queue = new Queue<T>();
    }

    public ActionBlockEx(Action<T> action,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null) : this(
            item => { action(item); return Task.CompletedTask; }, dataflowBlockOptions)
    {
        if (action == null) throw new ArgumentNullException(nameof(action));
    }

    public int InputCount { get { lock (_queue) return _queue.Count; } }
    public IEnumerable<T> InputQueue { get { lock (_queue) return _queue.ToList(); } }

    public Task Completion => _actionBlock.Completion;
    public void Complete() => _actionBlock.Complete();
    void IDataflowBlock.Fault(Exception ex) => _actionBlock.Fault(ex);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
        T item, ISourceBlock<T> source, bool consumeToAccept)
    {
        var sourceProxy = source != null ? new SourceProxy(source, this) : null;
        lock (_queue)
        {
            var offerResult = _actionBlock.OfferMessage(header, null, sourceProxy,
                consumeToAccept);
            if (offerResult == DataflowMessageStatus.Accepted
                && (sourceProxy == null || !sourceProxy.ConsumeMessageInvoked))
            {
                _queue.Enqueue(item);
            }
            return offerResult;
        }
    }

    private class SourceProxy : ISourceBlock<object>
    {
        private readonly ISourceBlock<T> _realSource;
        private readonly ActionBlockEx<T> _realTarget;

        public bool ConsumeMessageInvoked { get; private set; }

        public SourceProxy(ISourceBlock<T> realSource, ActionBlockEx<T> realTarget)
        {
            _realSource = realSource;
            _realTarget = realTarget;
        }

        object ISourceBlock<object>.ConsumeMessage(DataflowMessageHeader header,
            ITargetBlock<object> target, out bool messageConsumed)
        {
            this.ConsumeMessageInvoked = true;
            lock (_realTarget._queue)
            {
                var item = _realSource.ConsumeMessage(header, _realTarget,
                    out messageConsumed);
                if (messageConsumed) _realTarget._queue.Enqueue(item);
            }
            return null;
        }

        bool ISourceBlock<object>.ReserveMessage(DataflowMessageHeader header,
            ITargetBlock<object> target)
        {
            return _realSource.ReserveMessage(header, _realTarget);
        }

        void ISourceBlock<object>.ReleaseReservation(DataflowMessageHeader header,
            ITargetBlock<object> target)
        {
            _realSource.ReleaseReservation(header, _realTarget);
        }

        Task IDataflowBlock.Completion => throw new NotSupportedException();
        void IDataflowBlock.Complete() => throw new NotSupportedException();
        void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
        IDisposable ISourceBlock<object>.LinkTo(ITargetBlock<object> target,
            DataflowLinkOptions linkOptions) => throw new NotSupportedException();
    }

}

This implementation is based on an internal ActionBlock<object> that is supplied with dummy null messages. Its communication with the linked ISourceBlock is intercepted so that the actual messages are acquired and stored in an internal Queue<T>. This indirection adds some overhead (an object allocation occurs on every message received), so use this class with caution!

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104