2

I'm trying to create some sort of queue that will process the N latest messages received. Right now I have this:

private static void SetupMessaging()
{
    _messagingBroadcastBlock = new BroadcastBlock<string>(msg => msg, new ExecutionDataflowBlockOptions
    {
        //BoundedCapacity = 1,
        EnsureOrdered = true,
        MaxDegreeOfParallelism = 1,
        MaxMessagesPerTask = 1
    });

    _messagingActionBlock = new ActionBlock<string>(msg =>
    {
        Console.WriteLine(msg);
        Thread.Sleep(5000);
    }, new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 2,
        EnsureOrdered = true,
        MaxDegreeOfParallelism = 1,
        MaxMessagesPerTask = 1    
    });

    _messagingBroadcastBlock.LinkTo(_messagingActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
    _messagingBroadcastBlock.LinkTo(DataflowBlock.NullTarget<string>());
}

The problem is if I post 1,2,3,4,5 to it I will get 1,2,5 but i'd like it to be 1,4,5. Any suggestions are welcome.
UPD 1
I was able to make the following solution work

class FixedCapacityActionBlock<T>
{
    private readonly ActionBlock<CancellableMessage<T>> _actionBlock;

    private readonly ConcurrentQueue<CancellableMessage<T>> _inputCollection = new ConcurrentQueue<CancellableMessage<T>>();

    private readonly int _maxQueueSize;

    private readonly object _syncRoot = new object();

    public FixedCapacityActionBlock(Action<T> act, ExecutionDataflowBlockOptions opt)
    {
        var options = new ExecutionDataflowBlockOptions
        {
            EnsureOrdered = opt.EnsureOrdered,
            CancellationToken = opt.CancellationToken,
            MaxDegreeOfParallelism = opt.MaxDegreeOfParallelism,
            MaxMessagesPerTask = opt.MaxMessagesPerTask,
            NameFormat = opt.NameFormat,
            SingleProducerConstrained = opt.SingleProducerConstrained,
            TaskScheduler = opt.TaskScheduler,
            //we intentionally ignore this value
            //BoundedCapacity = opt.BoundedCapacity
        };
        _actionBlock = new ActionBlock<CancellableMessage<T>>(cmsg =>
        {
            if (cmsg.CancellationTokenSource.IsCancellationRequested)
            {
                return;
            }

            act(cmsg.Message);
        }, options);

        _maxQueueSize = opt.BoundedCapacity;
    }

    public bool Post(T msg)
    {
        var fullMsg = new CancellableMessage<T>(msg);

        //what if next task starts here?
        lock (_syncRoot)
        {
            _inputCollection.Enqueue(fullMsg);

            var itemsToDrop = _inputCollection.Skip(1).Except(_inputCollection.Skip(_inputCollection.Count - _maxQueueSize + 1));

            foreach (var item in itemsToDrop)
            {
                item.CancellationTokenSource.Cancel();
                CancellableMessage<T> temp;
                _inputCollection.TryDequeue(out temp);
            }

            return _actionBlock.Post(fullMsg);
        }
    }
}

And

class CancellableMessage<T> : IDisposable
{
    public CancellationTokenSource CancellationTokenSource { get; set; }

    public T Message { get; set; }

    public CancellableMessage(T msg)
    {
        CancellationTokenSource = new CancellationTokenSource();
        Message = msg;
    }

    public void Dispose()
    {
        CancellationTokenSource?.Dispose();
    }
}

While this works and actually does the job this implementation looks dirty, also possibly not thread safe.

VMAtm
  • 27,943
  • 17
  • 79
  • 125
HardLuck
  • 1,497
  • 1
  • 22
  • 43

2 Answers2

2

Here is a TransformBlock and ActionBlock implementation that drops the oldest messages in its queue, whenever newer messages are received and the BoundedCapacity limit has been reached. It behaves quite similar to a Channel configured with BoundedChannelFullMode.DropOldest.

public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockDropOldest<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IProgress<TInput> droppedMessages = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    var boundedCapacity = dataflowBlockOptions.BoundedCapacity;
    var cancellationToken = dataflowBlockOptions.CancellationToken;

    var queue = new Queue<TInput>(Math.Max(0, boundedCapacity));

    var outputBlock = new BufferBlock<TOutput>(new DataflowBlockOptions()
    {
        BoundedCapacity = boundedCapacity,
        CancellationToken = cancellationToken
    });

    if (boundedCapacity != DataflowBlockOptions.Unbounded)
        dataflowBlockOptions.BoundedCapacity = checked(boundedCapacity * 2);
    // After testing, at least boundedCapacity + 1 is required.
    // Make it double to be sure that all non-dropped messages will be processed.
    var transformBlock = new ActionBlock<object>(async _ =>
    {
        TInput item;
        lock (queue)
        {
            if (queue.Count == 0) return;
            item = queue.Dequeue();
        }
        var result = await transform(item).ConfigureAwait(false);
        await outputBlock.SendAsync(result, cancellationToken).ConfigureAwait(false);
    }, dataflowBlockOptions);
    dataflowBlockOptions.BoundedCapacity = boundedCapacity; // Restore initial value

    var inputBlock = new ActionBlock<TInput>(item =>
    {
        var droppedEntry = (Exists: false, Item: (TInput)default);
        lock (queue)
        {
            transformBlock.Post(null);
            if (queue.Count == boundedCapacity) droppedEntry = (true, queue.Dequeue());
            queue.Enqueue(item);
        }
        if (droppedEntry.Exists) droppedMessages?.Report(droppedEntry.Item);
    }, new ExecutionDataflowBlockOptions()
    {
        CancellationToken = cancellationToken
    });

    PropagateCompletion(inputBlock, transformBlock);
    PropagateFailure(transformBlock, inputBlock);
    PropagateCompletion(transformBlock, outputBlock);
    _ = transformBlock.Completion.ContinueWith(_ => { lock (queue) queue.Clear(); },
        TaskScheduler.Default);

    return DataflowBlock.Encapsulate(inputBlock, outputBlock);

    async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        var exception = source.Completion.IsFaulted ? source.Completion.Exception : null;
        if (exception != null) target.Fault(exception); else target.Complete();
    }
    async void PropagateFailure(IDataflowBlock source, IDataflowBlock target)
    {
        try { await source.Completion.ConfigureAwait(false); } catch { }
        if (source.Completion.IsFaulted) target.Fault(source.Completion.Exception);
    }
}

// Overload with synchronous lambda
public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockDropOldest<TInput, TOutput>(
    Func<TInput, TOutput> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IProgress<TInput> droppedMessages = null)
{
    return CreateTransformBlockDropOldest(item => Task.FromResult(transform(item)),
        dataflowBlockOptions, droppedMessages);
}

// ActionBlock equivalent
public static ITargetBlock<TInput>
    CreateActionBlockDropOldest<TInput>(
    Func<TInput, Task> action,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IProgress<TInput> droppedMessages = null)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateTransformBlockDropOldest<TInput, object>(
        async item => { await action(item).ConfigureAwait(false); return null; },
        dataflowBlockOptions, droppedMessages);
    block.LinkTo(DataflowBlock.NullTarget<object>());
    return block;
}

// ActionBlock equivalent with synchronous lambda
public static ITargetBlock<TInput>
    CreateActionBlockDropOldest<TInput>(
    Action<TInput> action,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null,
    IProgress<TInput> droppedMessages = null)
{
    return CreateActionBlockDropOldest(
        item => { action(item); return Task.CompletedTask; },
        dataflowBlockOptions, droppedMessages);
}

The idea is to store the queued items in an auxiliary Queue, and pass dummy (null) values to an internal ActionBlock<object>. The block ignores the items passed as arguments, and takes instead an item from the queue, if there is any. Α lock is used to ensure that all non-dropped items in the queue will be eventually processed (unless of course an exception occurs).

There is also an extra feature. An optional IProgress<TInput> droppedMessages argument allows to receive notifications every time a message is dropped.

Usage example:

_messagingActionBlock = CreateActionBlockDropOldest<string>(msg =>
{
    Console.WriteLine($"Processing: {msg}");
    Thread.Sleep(5000);
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 2,
}, new Progress<string>(msg =>
{
    Console.WriteLine($"Message dropped: {msg}");
}));
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
1

TPL Dataflow doesn't fit well into Last N messages, as it's meant to be queue, or pipeline (FIFO), not the stack (LIFO). Are you really need to do this with a dataflow library?

It's much easier with ConcurrentStack<T>, you just introduce one producer task, which posts to the stack, and one consumer task, which gets messages from stack while number of handled ones are lesser than N (More about Producer-Consumer).

If you need TPL Dataflow, you can use it in consumer task, to start handling the last messages, but not in producer, as it's really not the way it was meant to be used. Moreover, there are some other libraries with event-based architecture, which may fit more naturally for your problem.

VMAtm
  • 27,943
  • 17
  • 79
  • 125