0

I need to implement the Producer Consumer pattern with priorities and I'm searching the best option in terms of robust and performance.

These are my requirements

  • Multiple Threads can enqueue elements
  • One Thread reading elements
  • Maintain ordering
  • Limited number of priorities

I have done an implementation using BlockingCollection and also I have discovered TPLDataFlow library and I have done some similar implementation. I have been doing some benchmarking using BenchmarkDotNet but the TPL implementation is very slow so I think that it's not an option.

Do you have any suggestions to implement Producer Consumer with PriorityQueue, one important think is that I only need limited number of priorities.

I know that in NetCore there is a new PriorityQueue but I'm using NetFw 4.8

BlockingCollection implementation

public class PriorityQueue : IProducerConsumer
{
    private readonly BlockingCollection<Action> _lowQueue;
    private readonly BlockingCollection<Action> _normalQueue;
    private readonly BlockingCollection<Action> _highQueue;
    private readonly CancellationToken _cancelToken;

    private Task _dequeTask;

    public PriorityQueue(CancellationToken cancelToken)
    {
        _lowQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
        _normalQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
        _highQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
    }

    public void Enqueue(Action item, QueuePriority priority = QueuePriority.Default)
    {
        BlockingCollection<Action> queue;
        switch (priority)
        {
            case QueuePriority.Low:
                queue = _lowQueue;
                break;
            case QueuePriority.Normal:
                queue = _normalQueue;
                break;
            case QueuePriority.Hight:
                queue = _highQueue;
                break;
            default:
                queue = _lowQueue;
                break;
        }

        if (!queue.IsAddingCompleted)
        {
            queue.Add(item);
        }
    }

    public void Stop()
    {
        _lowQueue.CompleteAdding();
        _normalQueue.CompleteAdding();
        _highQueue.CompleteAdding();
    }

    public async Task StopAsync()
    {
        await Task.Run(() => Stop());
    }

    public void StartDequeing()
    {
        _dequeTask = Task.Factory.StartNew(DequeueTask, _cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    }

    private void DequeueTask()
    {
        var queues = new BlockingCollection<Action>[] { _highQueue, _normalQueue, _lowQueue };
        while (!_cancelToken.IsCancellationRequested)
        {
            BlockingCollection<Action>.TakeFromAny(queues, out Action item, _cancelToken);
            item?.Invoke();
        }
    }
}

TPL DataFlow with BufferBlock

public class PriorityFlowQueue : IProducerConsumer
{
    private readonly BufferBlock<Action> _lowBuffer;
    private readonly BufferBlock<Action> _normalBuffer;
    private readonly BufferBlock<Action> _highBuffer;
    private readonly BufferBlock<Action> _sourceBuffer;
    private readonly CancellationToken _cancelToken;
    private Task _dequeTask;

    public PriorityFlowQueue(CancellationToken cancelToken)
    {
        _cancelToken = cancelToken;
        var options = new DataflowBlockOptions() { EnsureOrdered = true };
        _lowBuffer = new BufferBlock<Action>(options);
        _normalBuffer = new BufferBlock<Action>(options);
        _highBuffer = new BufferBlock<Action>(options);
        _sourceBuffer = new BufferBlock<Action>(options);
        Task.Run(ForwardToSource, _cancelToken);
    }

    public void Enqueue(Action item, QueuePriority priority = QueuePriority.Default)
    {
        var buffer = GetBufferBlockByPriority(priority);
        buffer.Post(item);
    }

    public async Task EnqueueAsync(Action item, QueuePriority priority = QueuePriority.Default)
    {
        var buffer = GetBufferBlockByPriority(priority);
        await buffer.SendAsync(item);
    }

    public void StartDequeing()
    {
        _dequeTask = Task.Run(DequeueTask, _cancelToken);
    }

    public async Task StopAsync()
    {
        _highBuffer.Complete();
        _normalBuffer.Complete();
        _lowBuffer.Complete();
        await Task.WhenAll(_highBuffer.Completion, _normalBuffer.Completion, _lowBuffer.Completion);
    }

    private async Task ForwardToSource()
    {
        while (!_cancelToken.IsCancellationRequested)
        {
            await Task.WhenAny(_highBuffer.OutputAvailableAsync(), _normalBuffer.OutputAvailableAsync(), _lowBuffer.OutputAvailableAsync());
            Action item;
            if (_highBuffer.TryReceive(out item)) { }
            else if (_normalBuffer.TryReceive(out item)) { }
            else if (_lowBuffer.TryReceive(out item)) { }
            await _sourceBuffer.SendAsync(item);
        }
    }
    private async Task DequeueTask()
    {
        while (await _sourceBuffer.OutputAvailableAsync())
        {
            var item = await _sourceBuffer.ReceiveAsync();
            item?.Invoke();
        }
    }

    private BufferBlock<Action> GetBufferBlockByPriority(QueuePriority priority)
    {
        BufferBlock<Action> buffer;
        switch (priority)
        {
            case QueuePriority.Low:
                buffer = _lowBuffer;
                break;
            case QueuePriority.Normal:
                buffer = _normalBuffer;
                break;
            case QueuePriority.Hight:
                buffer = _highBuffer;
                break;
            default:
                buffer = _lowBuffer;
                break;
        }

        return buffer;
    }
}
JuanDYB
  • 590
  • 3
  • 9
  • 23
  • Possibly related: [Concurrent collection with priority](https://stackoverflow.com/questions/23470196/concurrent-collection-with-priority) – Theodor Zoulias May 28 '23 at 22:28
  • 1
    @TheodorZoulias I'm not using NetCore, I have been reading about recently added Priority Queue but I'm using NetStandard 2.0 and Net Fw 4.8 – JuanDYB May 29 '23 at 06:18
  • 1
    For .NET Framework you could substitute the `PriorityQueue` with a `SortedSet`. See [this answer](https://stackoverflow.com/questions/65569452/data-structure-to-store-sorted-items-based-on-a-cost-field/65590586#65590586 "Data structure to store sorted items based on a cost field") for an example. – Theodor Zoulias May 29 '23 at 16:06

0 Answers0