0

For a personal exercise, I'm trying to do a ConcurrentPriorityQueue class, with related Unit tests. I want the class to have the following characteristics:

  1. The priority is defined so that high number = high priority.
  2. The dequeue order within the same priority level is consistent, always FIFO.
  3. It's thread-safe.
  4. I'd like it to be generic, and with TElement coming before TPriority.

Note that these features make it different from the .NET PriorityQueue.

Implementation

Here's my go at this, please find it also at this dotNetFiddle. In essence, it's just a collection of ConcurrentQueues, one per each priority level, stored in a ConcurrentDictionary. I'm using System.Collections.Concurrent for the underlying collections.

/// <summary>
/// Implementation of PriorityQueue that has the following characteristics:
///     - The priority is defined so that high number = high priority
///     - The dequeue order within the same priority level is consistent, always FIFO.
///     - It's thread-safe.
/// In this, it is different from the PriorityQueue introduced in .NET6.
/// </summary>
public class ConcurrentPriorityQueue<TElement, TPriority> where TPriority : IComparable<TPriority>
{
    public ConcurrentDictionary<TPriority, ConcurrentQueue<TElement>> Queues { get; private set; } = new();

    public bool IsEmpty { get { return !Queues.Any(q => q.Value.Any()); } }

    public void Enqueue(TElement element, TPriority priority)
    {
        if (!Queues.TryGetValue(priority, out var queue))
        {
            queue = new();
            Queues[priority] = queue;
        }

        queue.Enqueue(element);
    }

    public bool TryDequeue(out TElement? element, out TPriority? priority)
    {
        if (IsEmpty)
        {
            element = default(TElement);
            priority = default(TPriority);
            return false;
        }

        var highestPriority = Queues.Keys.Max();
        var highestPriorityQueue = Queues[highestPriority];

        highestPriorityQueue.TryDequeue(out element);
        if (!highestPriorityQueue.Any())
            Queues.Remove(highestPriority, out _);

        priority = highestPriority;
        return true;
    }

    public TElement Dequeue()
    {
        if (IsEmpty)
            throw new InvalidOperationException("PriorityQueue is empty.");

        TryDequeue(out var element, out _);
        return element;
    }

    public TElement Peek()
    {
        if (IsEmpty)
            throw new InvalidOperationException("PriorityQueue is empty.");

        var highestPriority = Queues.Keys.Max();
        Queues[highestPriority].TryPeek(out var peeked);
        return peeked;
    }

    public TElement? TryPeek()
    {
        if (IsEmpty)
            return default;

        return Peek();
    }

    public int Count()
    {
        return Queues.Sum(q => q.Value.Count);
    }
}

Testing

I have tested this collection in simple scenarios, which work (please find some tests in the fiddle). However, for this StackOverflow question, I would like to focus the attention on testing in concurrent scenarios. To do so, I've written the following test, which fails. The logic behind this is that I try to enqueue several items with multiple tasks and do the same for dequeuing. The dequeuing expect to be able to extract as many items as numthreads * numElements, because we enqueued as many. However, the dequeue fails with a missing key error, whose cause I fail to understand.

    [Test]
    public void EnqueueAndTryDequeue_ConcurrentAccess_Successful()
    {
        var queue = new ConcurrentPriorityQueue<int, int>();
        int numElements = 100;
        int numThreads = 10;
    
        var tasks = new List<Task>();
    
        // Enqueue tasks
        for (int i = 0; i < numThreads; i++)
        {
            tasks.Add(Task.Run(() =>
            {
                for (int j = 0; j < numElements; j++)
                {
                    queue.Enqueue(j, j);
                }
            }));
        }
    
        // Dequeue tasks
        for (int i = 0; i < numThreads; i++)
        {
            tasks.Add(Task.Run(() =>
            {
                for (int j = 0; j < numElements; j++)
                {
                    queue.TryDequeue(out var element, out var priority);
                }
            }));
        }
    
        Task.WhenAll(tasks).Wait();
    
        queue.Count().Should().Be(0);
    }

Questions

My questions are:

  1. Am I doing something wrong in the implementation, in the test, or in both?
  2. I'm sure there are many ways of testing this collection in concurrent scenarios. Is my test designed in a way that makes sense, or should I first try to test the collection in concurrent scenarios with some other strategy?

Thanks everyone for your consideration! Again, please find the fiddle here. :)

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
alelom
  • 2,130
  • 3
  • 26
  • 38
  • 3
    @Dmitry you're **wrong**. [Code Review is for **working** code](https://codereview.stackexchange.com/help/on-topic). I have asked similar questions there in the past and I've been rightly addressed to StackOverflow. Please make sure you've thoroughly read the rules and the content of the question before commenting. – alelom Aug 05 '23 at 15:49
  • Before posting on Code Review please read [A guide to Code Review for Stack Overflow users](https://codereview.meta.stackexchange.com/questions/5777/a-guide-to-code-review-for-stack-overflow-users/5778#5778) and [How do I ask a good question?](https://codereview.stackexchange.com/help/how-to-ask) – pacmaninbw Aug 05 '23 at 15:51
  • 2
    @pacmaninbw thanks, please let's stop referring to Code Review, this question belongs to SO. – alelom Aug 05 '23 at 15:52
  • 1
    @Dmitry I think "However, the dequeue fails with a missing key error, whose cause I fail to understand." is a perfectly valid problem statement on Stack Overflow. – Mast Aug 05 '23 at 15:53
  • You can simplify your implementation by replacing the dictionary of queues with [SortedList](https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.sortedlist-2?view=net-7.0). The key should be a tuple of priority + timestamp. You need to pass an implementation of `IComparer` to its c'tor, which will compare first by priority **then** by the timestamp. The oldest element with the highest priority will be then the first or the last element in the list, depending on your comperer implementation. – Artur Aug 05 '23 at 20:07
  • 1
    At first glance you've created an inefficient implementation of a priority queue. The normal implementation uses a simple array to do it's work. Operations should be at least `log n`. It seems you're on at least `n`. – Enigmativity Aug 06 '23 at 02:22

2 Answers2

2

I'm not providing a complete solution here, but I'll try to help you on your way by highlighting a few issues in your current implementation. Overall, your implementation is lacking some kind of locking, whether it's optimistic (in short: "retry logic") or pessimistic (locking in advance).


Broken FIFO ordering

The backing ConcurrentDictionary uses IEqualityComparer<T> to check if two keys are the same. The generic type parameter TPriority currently also allows reference types to be used for which two individual instances are not considered to be equal by default. This means, enqueuing two elements and passing two different instances of a TPriority to Enqueue will cause the code to create two ConcurrentQueue even if the priority is the same (i.e. they're two different keys in the dictionary). When dequeuing, you expect those two items to be dequeued in the same order as they were enqueued but that's not guaranteed and depends on the implementation of ConcurrentDictionary. See a demonstration.

You could simplify you implementation if you constraint TPriority to be a value type:

public class ConcurrentPriorityQueue<TElement, TPriority>
    where TPriority : struct, IComparable<TPriority>
//                    ^

This works as long as the struct type you use as your TPriority only contain value type members, as the default Equals implementation of a struct compares all member values.

A more sophisticated solution would be to supply an IEqualityComparer<T> to the constructor of ConcurrentPriorityQueue.


Enqueuing

if (!Queues.TryGetValue(priority, out var queue))
{
    queue = new();
    Queues[priority] = queue; // <- Not thread safe
}

In the above snippet, two threads might enter the if-statement at the same time, both attempting to create and add a new inner queue to the dictionary. This is what GetOrAdd(TKey, Func<TKey,TValue>) tries to solve. Instead you could do the following:

var queue = Queues.GetOrAdd(priority, (prio) => new());
queue.Enqueue(element);

This is not completely safe, depending on your implementation of dequeuing, as queue might be removed due to some dequeuing in-between the above two lines. Thus queue.Enqueue(element); might add an element to a queue that no longer is part of Queues.


Dequeuing

In TryDequeue there are several steps which depend on the current state of Queues. In a concurrent environment, the state of Queues can change in-between each of those steps, which means any step might act based on an outdated state (see added comments in the snippet - when I write "now" it means "before the following line executes"):

if (IsEmpty)
{
    // IsEmpty might be false now
    element = default(TElement);
    priority = default(TPriority);
    return false;
}

// IsEmpty might be true now
var highestPriority = Queues.Keys.Max();
// Another thread might just have taken the last element at highestPriority now
// This is probably why you get the exception you mentioned
var highestPriorityQueue = Queues[highestPriority];

// highestPriorityQueue might be empty now
highestPriorityQueue.TryDequeue(out element);
if (!highestPriorityQueue.Any())
    // highestPriority might have already been removed now
    Queues.Remove(highestPriority, out _);

priority = highestPriority;
return true;

Similar issues are present in Dequeue, Peek and TryPeek.

Solving these issues is not simple but requires adding some locking. As shortly mentioned you can go the route of pessimistic locking by acquiring a lock before performing the operations, or optimistic locking. The latter might be a little more tricky but is likely to be more performant if that's a goal you're aiming for.

Xerillio
  • 4,855
  • 1
  • 17
  • 28
  • 1
    Thank you. This made me realise the importance and types of locking. The `GetOrAdd` enqueuing was eye-opening. The observation on the FIFO ordering is also important and interesting, and I appreciated a lot your code snippet. Together with [the other answer](https://stackoverflow.com/a/76843299/3873799) from @Theodor Zoulias, which gave a determining answer on the Testing, I learned a lot. I really appreciate it. – alelom Aug 06 '23 at 12:22
2

Am I doing something wrong in the implementation, in the test, or in both?

In both, but mainly in the implementation. The test can be improved in many ways, with the most obvious being to ensure that the number of threads available in the ThreadPool matches the number of workers that enqueue and dequeue items in the queue. By default the number of threads that the ThreadPool creates instantly on demand is Environment.ProcessorCount, which is likely to be less than the 20 (numThreads * 2). So your workers are not running all at the same time. Some have to complete before some others can start. What you should do is either to avoid the ThreadPool (use the Thread constructor or the combination Task.Factory.StartNew+TaskCreationOptions.LongRunning instead of Task.Run), or configure the ThreadPool appropriately:

ThreadPool.SetMinThreads(numThreads * 2, Environment.ProcessorCount);

Now regarding the implementation, unfortunately it's a race-conditions galore. Whenever you issue two commands to the ConcurrentDictionary<TKey, TValue> where the logical consistency of the second command depends on the outcome of the first command, you have a race condition. This is an inherent problem with the ConcurrentDictionary<TKey, TValue>, which limits its applications to simple scenarios. Your scenario is anything but simple, and my semi-expert opinion is that trying to make a priority queue out of this collection is practically impossible. Especially if you want it to perform superbly on top of functioning correctly.

My suggestion is to throw the whole idea out of the window, and start afresh with a standard PriorityQueue<TElement, TPriority> as your base, stuff the TPriority with a long incremented number to resolve ties (example), configure it with an appropriate IComparer<T>, and protect it everywhere with a lock even when you just read its Count property (example). This way you'll get a decently performing ConcurrentPriorityQueue<TElement, TPriority>, whose correctness will be easily provable. You won't even need tests to prove it, a mere visual inspection will be enough. The only tricky part of making a thread-safe collection out of a standard collection is the enumeration. Your ConcurrentPriorityQueue<TElement, TPriority> does not implement the IEnumerable<T> interface, so you don't have to worry about it.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 2
    +1 Completely agree, for practical purposes, wrapping a standard `PriorityQueue` with locks is the sane way to go, unless you've got some crazy performance requirements. But a fun exercise in multithreading to try for a little more sophistication nonetheless. – Xerillio Aug 05 '23 at 20:14
  • 1
    Thank you. The suggestion of setting `ThreadPool.SetMinThreads(numThreads * 2, Environment.ProcessorCount)` helped with fixing a problem I had noticed in the Test, i.e. the number of queued elements was supposed to be 10 per each priority queue but wasn't consistent. Now I understand that it was due to picking threads from the limited ThreadPool. I also really appreciate your suggestion on reworking using a PriorityQueue, which I agree, it's definitely the way to go, but for this exercise I was interested in looking at ways to work with these collections. – alelom Aug 06 '23 at 11:14