3

I'm building a producer consummer queue in C# and I was reading for searching the best method in terms of robust and performance. For years I was using always BlockingCollection but I have discovered TPLDataflow and Channels.

I'have been doing some benchmarking and I have seen that both TPL and Channels are much more faster in dequeing elements.

My requirements are

  • Queue behaviour (maintain item ordering)
  • Multiple threads can Enqueue elements
  • One thread reading elements (to maintain order)

IProducerConsumer interface

public interface IProducerConsumer
{
    void Enqueue(Action item);
    void Stop();
    void StartDequeing();
}

Blocking Collection Implementation

public class BlockingCollectionQueue : IProducerConsumer
{
    private readonly BlockingCollection<Action> _defaultQueue;
    private Task _dequeTask;

    public BlockingCollectionQueue()
    {
        _defaultQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
    }

    public void Enqueue(Action item)
    {
        if (!_defaultQueue.IsAddingCompleted)
        {
            _defaultQueue.Add(item);
        }
    }

    public void Stop()
    {
        _defaultQueue.CompleteAdding();
    }

    public void StartDequeing()
    {
        Task.Run(DequeueTask);
    }

    private void DequeueTask()
    {
        foreach (var item in _defaultQueue.GetConsumingEnumerable())
        {
            item?.Invoke();
        }
    }
}

Channel Implementation

public class ChannelQueue : IProducerConsumer
{
    private readonly Channel<Action> _channel;
    private readonly ChannelWriter<Action> _channelWriter;
    private readonly ChannelReader<Action> _channelReader;
    public ChannelQueue()
    {
        _channel = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions() { SingleReader = true });
        _channelWriter = _channel.Writer;
        _channelReader = _channel.Reader;
    }
    public void Enqueue(Action item)
    {
        _channelWriter.TryWrite(item);
    }

    public void StartDequeing()
    {
        Task.Run(DequeueTask);
    }

    private async Task DequeueTask()
    {
        while (await _channelReader.WaitToReadAsync())
        {
            while (_channelReader.TryRead(out var job))
            {
                job?.Invoke();
            }
        }
    }

    public void Stop()
    {
        _channelWriter.Complete();
    }
}

TPLDataFlow using BufferBlock implementation

public class DataFlowQueue : IProducerConsumer
{
    private readonly BufferBlock<Action> _bufferBlock;

    private Task _dequeTask;


    public DataFlowQueue()
    {
        var dataflowOptions = new DataflowBlockOptions() { EnsureOrdered = true };
        _bufferBlock = new BufferBlock<Action>(dataflowOptions);
    }

    public void Enqueue(Action item)
    {
        _bufferBlock.Post(item);
    }

    public void StartDequeing()
    {
        _dequeTask = Task.Run(DequeueTask);
    }

    private async Task DequeueTask()
    {
        while (await _bufferBlock.OutputAvailableAsync())
        {
            while(_bufferBlock.TryReceive(out var item))
            {
                item?.Invoke();
            }
        }
    }

    public void Stop()
    {
        _bufferBlock.Complete();
    }
}

TPLDataFlow using ActionBlock

public class ActionBlockQueue : IProducerConsumer
{
    private readonly ActionBlock<Action> _actionBlock;
    private Task _dequeTask;
    
    public ActionBlockQueue()
    {
        var dataflowOptions = new ExecutionDataflowBlockOptions() { EnsureOrdered = true, MaxDegreeOfParallelism = 1 };
        _actionBlock = new ActionBlock<Action>(item=> item?.Invoke(), dataflowOptions);
    }

    public void Enqueue(Action item, QueuePriority priority = QueuePriority.Default)
    {
        _actionBlock.Post(item);
    }

    public void StartDequeing()
    {
    }

    public void Stop()
    {
        _actionBlock.Complete();
    }
}

Benchmark using BenchmarDotNet

As you can see all of the implementations are storing Action in the queues and I'm using an AutoResetEvent to signal when the last element is dequeued.

public class MultipleJobBenchMark
{
    private AutoResetEvent _autoResetEvent;

    public MultipleJobBenchMark()
    {
        _autoResetEvent = new AutoResetEvent(false);
    }

    [Benchmark]
    public void BlockingCollectionQueue()
    {
        DoMultipleJobs(new BlockingCollectionQueue());
    }

    [Benchmark]
    public void DataFlowQueue()
    {
        DoMultipleJobs(new DataFlowQueue());
    }

    [Benchmark]
    public void ActionBlockQueue()
    {
        DoMultipleJobs(new ActionBlockQueue());
    }

    [Benchmark]
    public void ChannelQueue()
    {
        DoMultipleJobs(new ChannelQueue());
    }

    private void DoMultipleJobs(IProducerConsumer producerConsumerQueue)
    {
        producerConsumerQueue.StartDequeing();
        int jobs = 100000;
        for (int i = 0; i < jobs - 1; i++)
        {
            producerConsumerQueue.Enqueue(() => { });
        }
        producerConsumerQueue.Enqueue(() => _autoResetEvent.Set());
        _autoResetEvent.WaitOne();
        producerConsumerQueue.Stop();
    }
}

Results

  • BlockingCollection: Mean 21.5ms
  • BufferBlock Queue: Mean 14.937ms
  • ActionBlock Queue: 6.007ms
  • Channel: 4.781ms

Questions and Conclussions

By doing this exercise I have seen that at this time the use of BlockingCollection maybe is not the best option. I don't understand why there is such a big difference between BufferBlock and ActionBlock. I have done both implementations becase in my interface I was defined StartDequeue() method and with ActionBlock it is not possible because dequeuing is done at ActionBlock construct. Does my implementation using BufferBlock the best?

I wanted to post here my results to see which is the most accepted of Producer Consummer Queue at this momment and why I have seen such a big difference Between ActionBlock and BufferBlock

JuanDYB
  • 590
  • 3
  • 9
  • 23
  • If you want to know why there is a time difference attach a profiler and run both types and compare their profile snapshots. – Scott Chamberlain May 28 '23 at 19:45
  • @TheodorZoulias I have renamed the ´NonPriorityQueue´. In reference to CancellatonToken, it's true. My code is extracted to the one that I'm using in application but in the tests as you can see I'm not doing cancellation – JuanDYB May 28 '23 at 20:29
  • 1
    @TheodorZoulias I have deleted it and executed again benchmarks. The results are very similar with same conclussions. Thanks also for the observation – JuanDYB May 28 '23 at 22:14
  • 1
    @THEODORZOULIAS I have also removed the waits. It's true that in real case it would be convinient to wait for them but for this particular benchmark maybe it's better to avoid making differences between different cases. I have tested them and result are the same. – JuanDYB May 29 '23 at 06:26
  • 1
    @JuanDYB you're comparing different things, in the wrong way. Neither is a queue, to which you enqeue/dequeue items. In fact your `ActionBlockQueue` is trying to emulate the low-level workings of any Dataflow. The Dataflow library is meant to create *dataflow pipelines* with each block performing a job on incoming messages. The library itself handles input/output queueing, buffering, task allocation. Not the application code. Channels are a *low level* construct which, underneath, just happen to use ConcurrentQueue in some cases. You could build an ActionBlock with Channels – Panagiotis Kanavos May 29 '23 at 13:37
  • This means that the benchmark is trying to emulate the lowest level implementation details of both constructs, for a scenario they were **not** built for - job queues. There's a reason the docs refer to messages, not actions. There's a reason Channels provide different interfaces for reading and writing - you aren't supposed to use both in the same method. Just as you aren't supposed to try and access a block's Input or Output buffer in a Dataflow block. – Panagiotis Kanavos May 29 '23 at 13:40
  • @PanagiotisKanavos Thanks for your reply but I don't understand what do you want to say exactly. In my real application I'm using this type of structures to make producer consumer pattern with messages. For example queueing messages to send through serial port or sockets, or for receiving them and doing process. In this example I used Actions because I wanted to signal when last item was dequed. I only wanted to measure the performance in the dequeuing process. – JuanDYB May 30 '23 at 20:47
  • I'm saying this is a job queue, not the producer/consumer. I'm saying you're trying to compare cars (Dataflow) with car engines (Channels) to see which spins wheels faster, to determine which can carry more people. If you really want such benchmarks, check [Performance Showdown of Producer/Consumer (Job Queues) Implementations in C# .NET](https://michaelscodingspot.com/performance-of-producer-consumer/). The benchmarks compare job queue implementations with varying job sizes – Panagiotis Kanavos May 31 '23 at 07:18
  • The benchmark's author then went on to write a series of articles on the proper use of Dataflow to build pipelines: [Pipeline Pattern implementations in C#, Part 1](https://michaelscodingspot.com/pipeline-pattern-implementations-csharp/), [Pipeline Pattern in C# (part 2) with TPL Dataflow](https://michaelscodingspot.com/pipeline-pattern-tpl-dataflow/) and [Pipeline Implementations in C#: TPL Dataflow Async steps and Disruptor-net](https://michaelscodingspot.com/pipeline-implementations-csharp-3/). – Panagiotis Kanavos May 31 '23 at 07:20

1 Answers1

1

As your benchmarks reveal, the Channel<T> is a relatively more performant producer consumer queue than the BlockingCollection<T>. Which is reasonable since the Channel<T> is a newer component (2019), and takes advantage of the ValueTask<T> technology that was non-existent when the BlockingCollection<T> was introduced (2010). For this to have any measurable effect, you must be passing crazy many items per second through the queue. In which case it might be a good idea to consider processing the items in batches/chunks, instead of passing each item individually through the queue.

In general I think that the BlockingCollection<T> is still a good option when your producer consumer system is synchronous, i.e. when the producer and the consumer are running on dedicated threads. The Channel<T> is a natural choice when you want to build an asynchronous system, i.e. you are calling asynchronous APIs and you want to make efficient use of threads. As for the components found in the TPL Dataflow library, they are a valid option when you want to build an asynchronous system that can run on older versions of .NET. There are very few reasons to prefer the older BufferBlock<T> over the newer Channel<T> when both are available. The Channel<T> has a cleaner and more expressive API, and offers more options. Like the ability to drop old items, when new items are added and the maximum capacity has been reached.

A rare scenario where you may want to avoid the Channel<T> is in case your producer, or the consumer, or both, is using cancellation tokens in each and every asynchronous write/read operation, that are routinely canceled. This usage can trigger a memory leak in the Channel<T>, but not in the BufferBlock<T>. See this question for details.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Thanks for your reply, with this explain I think I have things more clear abou differences between different implementations – JuanDYB Jun 09 '23 at 09:39