For a personal exercise, I'm trying to do a ConcurrentPriorityQueue
class, with related Unit tests. I want the class to have the following characteristics:
- The priority is defined so that high number = high priority.
- The dequeue order within the same priority level is consistent, always FIFO.
- It's thread-safe.
- I'd like it to be generic, and with
TElement
coming beforeTPriority
.
Note that these features make it different from the .NET PriorityQueue.
Implementation
Here's my go at this, please find it also at this dotNetFiddle. In essence, it's just a collection of ConcurrentQueues, one per each priority level, stored in a ConcurrentDictionary. I'm using System.Collections.Concurrent for the underlying collections.
/// <summary>
/// Implementation of PriorityQueue that has the following characteristics:
/// - The priority is defined so that high number = high priority
/// - The dequeue order within the same priority level is consistent, always FIFO.
/// - It's thread-safe.
/// In this, it is different from the PriorityQueue introduced in .NET6.
/// </summary>
public class ConcurrentPriorityQueue<TElement, TPriority> where TPriority : IComparable<TPriority>
{
public ConcurrentDictionary<TPriority, ConcurrentQueue<TElement>> Queues { get; private set; } = new();
public bool IsEmpty { get { return !Queues.Any(q => q.Value.Any()); } }
public void Enqueue(TElement element, TPriority priority)
{
if (!Queues.TryGetValue(priority, out var queue))
{
queue = new();
Queues[priority] = queue;
}
queue.Enqueue(element);
}
public bool TryDequeue(out TElement? element, out TPriority? priority)
{
if (IsEmpty)
{
element = default(TElement);
priority = default(TPriority);
return false;
}
var highestPriority = Queues.Keys.Max();
var highestPriorityQueue = Queues[highestPriority];
highestPriorityQueue.TryDequeue(out element);
if (!highestPriorityQueue.Any())
Queues.Remove(highestPriority, out _);
priority = highestPriority;
return true;
}
public TElement Dequeue()
{
if (IsEmpty)
throw new InvalidOperationException("PriorityQueue is empty.");
TryDequeue(out var element, out _);
return element;
}
public TElement Peek()
{
if (IsEmpty)
throw new InvalidOperationException("PriorityQueue is empty.");
var highestPriority = Queues.Keys.Max();
Queues[highestPriority].TryPeek(out var peeked);
return peeked;
}
public TElement? TryPeek()
{
if (IsEmpty)
return default;
return Peek();
}
public int Count()
{
return Queues.Sum(q => q.Value.Count);
}
}
Testing
I have tested this collection in simple scenarios, which work (please find some tests in the fiddle). However, for this StackOverflow question, I would like to focus the attention on testing in concurrent scenarios. To do so, I've written the following test, which fails. The logic behind this is that I try to enqueue several items with multiple tasks and do the same for dequeuing. The dequeuing expect to be able to extract as many items as numthreads * numElements, because we enqueued as many. However, the dequeue fails with a missing key error, whose cause I fail to understand.
[Test]
public void EnqueueAndTryDequeue_ConcurrentAccess_Successful()
{
var queue = new ConcurrentPriorityQueue<int, int>();
int numElements = 100;
int numThreads = 10;
var tasks = new List<Task>();
// Enqueue tasks
for (int i = 0; i < numThreads; i++)
{
tasks.Add(Task.Run(() =>
{
for (int j = 0; j < numElements; j++)
{
queue.Enqueue(j, j);
}
}));
}
// Dequeue tasks
for (int i = 0; i < numThreads; i++)
{
tasks.Add(Task.Run(() =>
{
for (int j = 0; j < numElements; j++)
{
queue.TryDequeue(out var element, out var priority);
}
}));
}
Task.WhenAll(tasks).Wait();
queue.Count().Should().Be(0);
}
Questions
My questions are:
- Am I doing something wrong in the implementation, in the test, or in both?
- I'm sure there are many ways of testing this collection in concurrent scenarios. Is my test designed in a way that makes sense, or should I first try to test the collection in concurrent scenarios with some other strategy?
Thanks everyone for your consideration! Again, please find the fiddle here. :)