I need to implement the Producer Consumer pattern with priorities and I'm searching the best option in terms of robust and performance.
These are my requirements
- Multiple Threads can enqueue elements
- One Thread reading elements
- Maintain ordering
- Limited number of priorities
I have done an implementation using BlockingCollection
and also I have discovered TPLDataFlow
library and I have done some similar implementation.
I have been doing some benchmarking using BenchmarkDotNet
but the TPL implementation is very slow so I think that it's not an option.
Do you have any suggestions to implement Producer Consumer with PriorityQueue, one important think is that I only need limited number of priorities.
I know that in NetCore there is a new PriorityQueue but I'm using NetFw 4.8
BlockingCollection implementation
public class PriorityQueue : IProducerConsumer
{
private readonly BlockingCollection<Action> _lowQueue;
private readonly BlockingCollection<Action> _normalQueue;
private readonly BlockingCollection<Action> _highQueue;
private readonly CancellationToken _cancelToken;
private Task _dequeTask;
public PriorityQueue(CancellationToken cancelToken)
{
_lowQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
_normalQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
_highQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
}
public void Enqueue(Action item, QueuePriority priority = QueuePriority.Default)
{
BlockingCollection<Action> queue;
switch (priority)
{
case QueuePriority.Low:
queue = _lowQueue;
break;
case QueuePriority.Normal:
queue = _normalQueue;
break;
case QueuePriority.Hight:
queue = _highQueue;
break;
default:
queue = _lowQueue;
break;
}
if (!queue.IsAddingCompleted)
{
queue.Add(item);
}
}
public void Stop()
{
_lowQueue.CompleteAdding();
_normalQueue.CompleteAdding();
_highQueue.CompleteAdding();
}
public async Task StopAsync()
{
await Task.Run(() => Stop());
}
public void StartDequeing()
{
_dequeTask = Task.Factory.StartNew(DequeueTask, _cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
private void DequeueTask()
{
var queues = new BlockingCollection<Action>[] { _highQueue, _normalQueue, _lowQueue };
while (!_cancelToken.IsCancellationRequested)
{
BlockingCollection<Action>.TakeFromAny(queues, out Action item, _cancelToken);
item?.Invoke();
}
}
}
TPL DataFlow with BufferBlock
public class PriorityFlowQueue : IProducerConsumer
{
private readonly BufferBlock<Action> _lowBuffer;
private readonly BufferBlock<Action> _normalBuffer;
private readonly BufferBlock<Action> _highBuffer;
private readonly BufferBlock<Action> _sourceBuffer;
private readonly CancellationToken _cancelToken;
private Task _dequeTask;
public PriorityFlowQueue(CancellationToken cancelToken)
{
_cancelToken = cancelToken;
var options = new DataflowBlockOptions() { EnsureOrdered = true };
_lowBuffer = new BufferBlock<Action>(options);
_normalBuffer = new BufferBlock<Action>(options);
_highBuffer = new BufferBlock<Action>(options);
_sourceBuffer = new BufferBlock<Action>(options);
Task.Run(ForwardToSource, _cancelToken);
}
public void Enqueue(Action item, QueuePriority priority = QueuePriority.Default)
{
var buffer = GetBufferBlockByPriority(priority);
buffer.Post(item);
}
public async Task EnqueueAsync(Action item, QueuePriority priority = QueuePriority.Default)
{
var buffer = GetBufferBlockByPriority(priority);
await buffer.SendAsync(item);
}
public void StartDequeing()
{
_dequeTask = Task.Run(DequeueTask, _cancelToken);
}
public async Task StopAsync()
{
_highBuffer.Complete();
_normalBuffer.Complete();
_lowBuffer.Complete();
await Task.WhenAll(_highBuffer.Completion, _normalBuffer.Completion, _lowBuffer.Completion);
}
private async Task ForwardToSource()
{
while (!_cancelToken.IsCancellationRequested)
{
await Task.WhenAny(_highBuffer.OutputAvailableAsync(), _normalBuffer.OutputAvailableAsync(), _lowBuffer.OutputAvailableAsync());
Action item;
if (_highBuffer.TryReceive(out item)) { }
else if (_normalBuffer.TryReceive(out item)) { }
else if (_lowBuffer.TryReceive(out item)) { }
await _sourceBuffer.SendAsync(item);
}
}
private async Task DequeueTask()
{
while (await _sourceBuffer.OutputAvailableAsync())
{
var item = await _sourceBuffer.ReceiveAsync();
item?.Invoke();
}
}
private BufferBlock<Action> GetBufferBlockByPriority(QueuePriority priority)
{
BufferBlock<Action> buffer;
switch (priority)
{
case QueuePriority.Low:
buffer = _lowBuffer;
break;
case QueuePriority.Normal:
buffer = _normalBuffer;
break;
case QueuePriority.Hight:
buffer = _highBuffer;
break;
default:
buffer = _lowBuffer;
break;
}
return buffer;
}
}