I'm building a producer consummer queue in C# and I was reading for searching the best method in terms of robust and performance.
For years I was using always BlockingCollection
but I have discovered TPLDataflow
and Channels
.
I'have been doing some benchmarking and I have seen that both TPL and Channels are much more faster in dequeing elements.
My requirements are
- Queue behaviour (maintain item ordering)
- Multiple threads can Enqueue elements
- One thread reading elements (to maintain order)
IProducerConsumer interface
public interface IProducerConsumer
{
void Enqueue(Action item);
void Stop();
void StartDequeing();
}
Blocking Collection Implementation
public class BlockingCollectionQueue : IProducerConsumer
{
private readonly BlockingCollection<Action> _defaultQueue;
private Task _dequeTask;
public BlockingCollectionQueue()
{
_defaultQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
}
public void Enqueue(Action item)
{
if (!_defaultQueue.IsAddingCompleted)
{
_defaultQueue.Add(item);
}
}
public void Stop()
{
_defaultQueue.CompleteAdding();
}
public void StartDequeing()
{
Task.Run(DequeueTask);
}
private void DequeueTask()
{
foreach (var item in _defaultQueue.GetConsumingEnumerable())
{
item?.Invoke();
}
}
}
Channel Implementation
public class ChannelQueue : IProducerConsumer
{
private readonly Channel<Action> _channel;
private readonly ChannelWriter<Action> _channelWriter;
private readonly ChannelReader<Action> _channelReader;
public ChannelQueue()
{
_channel = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions() { SingleReader = true });
_channelWriter = _channel.Writer;
_channelReader = _channel.Reader;
}
public void Enqueue(Action item)
{
_channelWriter.TryWrite(item);
}
public void StartDequeing()
{
Task.Run(DequeueTask);
}
private async Task DequeueTask()
{
while (await _channelReader.WaitToReadAsync())
{
while (_channelReader.TryRead(out var job))
{
job?.Invoke();
}
}
}
public void Stop()
{
_channelWriter.Complete();
}
}
TPLDataFlow using BufferBlock implementation
public class DataFlowQueue : IProducerConsumer
{
private readonly BufferBlock<Action> _bufferBlock;
private Task _dequeTask;
public DataFlowQueue()
{
var dataflowOptions = new DataflowBlockOptions() { EnsureOrdered = true };
_bufferBlock = new BufferBlock<Action>(dataflowOptions);
}
public void Enqueue(Action item)
{
_bufferBlock.Post(item);
}
public void StartDequeing()
{
_dequeTask = Task.Run(DequeueTask);
}
private async Task DequeueTask()
{
while (await _bufferBlock.OutputAvailableAsync())
{
while(_bufferBlock.TryReceive(out var item))
{
item?.Invoke();
}
}
}
public void Stop()
{
_bufferBlock.Complete();
}
}
TPLDataFlow using ActionBlock
public class ActionBlockQueue : IProducerConsumer
{
private readonly ActionBlock<Action> _actionBlock;
private Task _dequeTask;
public ActionBlockQueue()
{
var dataflowOptions = new ExecutionDataflowBlockOptions() { EnsureOrdered = true, MaxDegreeOfParallelism = 1 };
_actionBlock = new ActionBlock<Action>(item=> item?.Invoke(), dataflowOptions);
}
public void Enqueue(Action item, QueuePriority priority = QueuePriority.Default)
{
_actionBlock.Post(item);
}
public void StartDequeing()
{
}
public void Stop()
{
_actionBlock.Complete();
}
}
Benchmark using BenchmarDotNet
As you can see all of the implementations are storing Action
in the queues and I'm using an AutoResetEvent
to signal when the last element is dequeued.
public class MultipleJobBenchMark
{
private AutoResetEvent _autoResetEvent;
public MultipleJobBenchMark()
{
_autoResetEvent = new AutoResetEvent(false);
}
[Benchmark]
public void BlockingCollectionQueue()
{
DoMultipleJobs(new BlockingCollectionQueue());
}
[Benchmark]
public void DataFlowQueue()
{
DoMultipleJobs(new DataFlowQueue());
}
[Benchmark]
public void ActionBlockQueue()
{
DoMultipleJobs(new ActionBlockQueue());
}
[Benchmark]
public void ChannelQueue()
{
DoMultipleJobs(new ChannelQueue());
}
private void DoMultipleJobs(IProducerConsumer producerConsumerQueue)
{
producerConsumerQueue.StartDequeing();
int jobs = 100000;
for (int i = 0; i < jobs - 1; i++)
{
producerConsumerQueue.Enqueue(() => { });
}
producerConsumerQueue.Enqueue(() => _autoResetEvent.Set());
_autoResetEvent.WaitOne();
producerConsumerQueue.Stop();
}
}
Results
- BlockingCollection: Mean 21.5ms
- BufferBlock Queue: Mean 14.937ms
- ActionBlock Queue: 6.007ms
- Channel: 4.781ms
Questions and Conclussions
By doing this exercise I have seen that at this time the use of BlockingCollection
maybe is not the best option.
I don't understand why there is such a big difference between BufferBlock
and ActionBlock
. I have done both implementations becase in my interface I was defined StartDequeue()
method and with ActionBlock
it is not possible because dequeuing is done at ActionBlock
construct.
Does my implementation using BufferBlock the best?
I wanted to post here my results to see which is the most accepted of Producer Consummer Queue at this momment and why I have seen such a big difference Between ActionBlock
and BufferBlock