I'm trying to create some sort of queue that will process the N latest messages received. Right now I have this:
private static void SetupMessaging()
{
_messagingBroadcastBlock = new BroadcastBlock<string>(msg => msg, new ExecutionDataflowBlockOptions
{
//BoundedCapacity = 1,
EnsureOrdered = true,
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 1
});
_messagingActionBlock = new ActionBlock<string>(msg =>
{
Console.WriteLine(msg);
Thread.Sleep(5000);
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 2,
EnsureOrdered = true,
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 1
});
_messagingBroadcastBlock.LinkTo(_messagingActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
_messagingBroadcastBlock.LinkTo(DataflowBlock.NullTarget<string>());
}
The problem is if I post 1,2,3,4,5 to it I will get 1,2,5 but i'd like it to be 1,4,5. Any suggestions are welcome.
UPD 1
I was able to make the following solution work
class FixedCapacityActionBlock<T>
{
private readonly ActionBlock<CancellableMessage<T>> _actionBlock;
private readonly ConcurrentQueue<CancellableMessage<T>> _inputCollection = new ConcurrentQueue<CancellableMessage<T>>();
private readonly int _maxQueueSize;
private readonly object _syncRoot = new object();
public FixedCapacityActionBlock(Action<T> act, ExecutionDataflowBlockOptions opt)
{
var options = new ExecutionDataflowBlockOptions
{
EnsureOrdered = opt.EnsureOrdered,
CancellationToken = opt.CancellationToken,
MaxDegreeOfParallelism = opt.MaxDegreeOfParallelism,
MaxMessagesPerTask = opt.MaxMessagesPerTask,
NameFormat = opt.NameFormat,
SingleProducerConstrained = opt.SingleProducerConstrained,
TaskScheduler = opt.TaskScheduler,
//we intentionally ignore this value
//BoundedCapacity = opt.BoundedCapacity
};
_actionBlock = new ActionBlock<CancellableMessage<T>>(cmsg =>
{
if (cmsg.CancellationTokenSource.IsCancellationRequested)
{
return;
}
act(cmsg.Message);
}, options);
_maxQueueSize = opt.BoundedCapacity;
}
public bool Post(T msg)
{
var fullMsg = new CancellableMessage<T>(msg);
//what if next task starts here?
lock (_syncRoot)
{
_inputCollection.Enqueue(fullMsg);
var itemsToDrop = _inputCollection.Skip(1).Except(_inputCollection.Skip(_inputCollection.Count - _maxQueueSize + 1));
foreach (var item in itemsToDrop)
{
item.CancellationTokenSource.Cancel();
CancellableMessage<T> temp;
_inputCollection.TryDequeue(out temp);
}
return _actionBlock.Post(fullMsg);
}
}
}
And
class CancellableMessage<T> : IDisposable
{
public CancellationTokenSource CancellationTokenSource { get; set; }
public T Message { get; set; }
public CancellableMessage(T msg)
{
CancellationTokenSource = new CancellationTokenSource();
Message = msg;
}
public void Dispose()
{
CancellationTokenSource?.Dispose();
}
}
While this works and actually does the job this implementation looks dirty, also possibly not thread safe.