1

I'm writing an application which manages a collection that requires frequent enqueuing and dequeuing of items in a miltithreaded environment. With single threaded, a simple List is probably enough, but concurrent nature of the environment poses some issues.

Here's the summary:

The structure needs to have a bool TryAdd(T) method, preferrably Add(TKey, TValue);

The structure needs to have a T TryRemove() method which takes a random or preferrably the first added item (essentially implementing a FIFO queue);

The structure needs to have a bool TryRemove(T) method, preferrably Remove(TKey);

So far I have three ideas, all with their issues:

  1. Implement a class containing a ConcurrentDictionary<TKey, TValue> and a ConcurrentQueue like this:
     internal class ConcurrentQueuedDictionary<TKey, TValue> where TKey : notnull
     {
        ConcurrentDictionary<TKey, TValue> _dictionary;
        ConcurrentQueue<TKey> _queue;
        object _locker;

        public bool TryAdd(TKey key, TValue value)
        {
            if (!_dictionary.TryAdd(key, value))
                return false;
            lock (_locker)
                _queue.Enqueue(key);
            return true;
        }

        public TValue TryRemove()
        {
            TKey key;
            lock (_locker) {
                if (_queue.IsEmpty)
                    return default(TValue);
                _queue.TryDequeue(out key);
            }
            TValue value;
            if (!_dictionary.Remove(key, out value))
                throw new Exception();
            return value;
        }

        public bool TryRemove(TKey key)
        {
            lock (_locker)
            {
                var copiedList = _queue.ToList();
                if (copiedList.Remove(key))
                    return false;
                _queue = new(copiedList);
            }
            return _dictionary.TryRemove(key, out _);
        }
    }

but that will require a Lock on Remove(T) because it demands a full deep copy of the initial Queue without the removed item while disallowing read from other threads, which means that at least Remove() will also have this lock, and this is meant to be an operation carried out often;

  1. Implement a class containing a ConcurrentDictionary<TKey, TValue> and a ConcurrentDictionary<int order, TKey>, where order is defined on TryAdd with two properties _addOrder and _removeOrder like this:
       internal class ConcurrentQueuedDictionary<TKey, TValue> where TKey : notnull
       {
            ConcurrentDictionary<TKey, TValue> _dictionary;
            ConcurrentDictionary<int, TKey> _order;
            int _addOrder = 0;
            int _removeOrder = 0;
    
            public bool TryAdd(TKey key, TValue value)
            {
                if (!_dictionary.TryAdd(key, value))
                    return false;
                if (!_order.TryAdd(unchecked(Interlocked.Increment(ref _addOrder)), key))
                    throw new Exception(); //Operation faulted, mismatch of data in _order
                return true;
            }
    
            public TValue TryRemove()
            {
                TKey key;
                if (!(_order.Count > 0 && _order.Remove(unchecked(Interlocked.Increment(ref _removeOrder)), out key)))
                    return default(TValue);
                return _dictionary[key];
            }
    
            public bool TryRemove(TKey key)
            {
                if (!_order.Remove(_order.Where(item => item.Value.Equals(key)).First().Key, out _))
                    return false;
                if (!_dictionary.Remove(key, out _))
                    throw new Exception();
                return true;
            }
       }

but I'm pretty sure just voicing this implementation had put me on a psychiatric watchlist somewhere because it's gonna be a masochistic nightmare to make work properly;

  1. Straight up locking a List because locks are necessary for option 1 anyway.

Any ideas? I'm kinda stumped by this issue as I don't have the best grasp on concurrent collections. Do I need a custom IProducerConsumerCollection? Is it even possible to have both random (or queued) and specific access to concurrent collection elements? Have any of you faced this before, maybe I'm looking at the issue wrong?

Edit: typos, formatting

Johnny Cache
  • 1,033
  • 7
  • 12
  • If there isn't much contention then locking a list sounds like the best solution – Charlieface May 30 '22 at 12:05
  • @Charlieface There could be, this collection is meant to be referenced nearly every second with an arbitrary amount of random elements removed each time. Adding happens less uniformly but exactly as many times as removing, so anything goes. Specific removal happens on query. Rather, my concern isn't so much contention - requesting threads don't much care for which exact item they're getting - but performance due to dozens, potentially hundreds of locks being acquired per second – Johnny Cache May 30 '22 at 12:15
  • Sounds like you need to rethink your workflow, and separate the two operations. It sounds like you should use a `ConcurrentQueue` to dequeue in FIFO order, then add it to a `ConcurrentDictionary` which represents currently worked items. Then on completion you can remove an item from the dictionary – Charlieface May 30 '22 at 12:57
  • It is seems that the structure should not be able to contain duplicate entries, correct? Also how many entries do you anticipate that the structure will contain on average, under typical load? – Theodor Zoulias May 30 '22 at 14:19
  • @TheodorZoulias No, the structure is not meant to contain any duplicates. Every item is unique by definition in context. The usual load would be dozens to thousands of items but it's kind of a special case in which initially the items would be taken out very quickly and then would take a long, semi-random time to put back in. The structure is meant to track free items so that they can be taken by something else. So most of the time the structure would ideally be empty or contain very few items but I'm worried about the initial load when it contains all of them. – Johnny Cache May 30 '22 at 14:30
  • Let's say that a consumer of the structure calls the `TryRemove()` method (without key), and the result is false because the structure is empty. What will the consumer do then? Will it wait synchronously for a small time span (blocking the current thread), and then try again in a loop? – Theodor Zoulias May 30 '22 at 14:56
  • @Charlieface The thing is, the items still need to be taken out selectively to be reassigned elsewhere where they could be more useful at the time, so the queue still needs to be able to rip an item out of the middle of it, no matter if it defines the Dictionary or the other way around. – Johnny Cache May 31 '22 at 08:32
  • @TheodorZoulias The consumer will understand that the queue is empty and wait for an external stimulus where the item currently freeing up will be assigned directly to the consumer instead of ending up in this queue. – Johnny Cache May 31 '22 at 08:32
  • Perhaps leave the item in the queue and just set a flag on it so that the next consumer thread to pick it up will ignore it and de-queue the next. – Charlieface May 31 '22 at 08:35
  • Ah, so you have an external mechanism that coordinates the workers. You don't need the FIFO structure to be itself the coordinator. That's good, because otherwise I would point out to the `BlockingCollection` class, and the possibility of customizing it with a specialized underlying `IProducerConsumerCollection` implementation. Which is currently [problematic](https://stackoverflow.com/questions/10736209/how-to-wrap-concurrentdictionary-in-blockingcollection/72128246#72128246), especially if you also need the bounded capacity functionality. – Theodor Zoulias May 31 '22 at 08:56
  • @Charlieface The flag would have to be part of a tuple or a wrapper class which I think should be avoided if possible. Nothing inherently wrong with that, but the system is difficult enough to understand as it is, and I'd rather have other people looking at this queue approach it as a black box without having to wonder what the hell the wrapper class is for. – Johnny Cache May 31 '22 at 15:08

1 Answers1

1

Creating a concurrent structure like this by combining built-in concurrent collections should be close to impossible, provided of course that correctness is paramount and race-conditions are strictly forbidden. The good news is that acquiring a lock a few thousands times per second is nowhere near the limit where contention starts to become an issue, provided that the operations inside the protected region are lightweight (their duration is measured in nanoseconds).

One way to achieve O(1) complexity of operations, is to combine a LinkedList<T> and a Dictionary<K,V>:

/// <summary>
/// Represents a thread-safe first in-first out (FIFO) collection of key/value pairs,
/// where the key is unique.
/// </summary>
public class ConcurrentKeyedQueue<TKey, TValue>
{
    private readonly LinkedList<KeyValuePair<TKey, TValue>> _queue;
    private readonly Dictionary<TKey, LinkedListNode<KeyValuePair<TKey, TValue>>>
        _dictionary;

    public ConcurrentKeyedQueue(IEqualityComparer<TKey> comparer = default)
    {
        _queue = new();
        _dictionary = new(comparer);
    }

    public int Count { get { lock (_queue) return _queue.Count; } }

    public bool TryEnqueue(TKey key, TValue value)
    {
        lock (_queue)
        {
            ref var node = ref CollectionsMarshal
                .GetValueRefOrAddDefault(_dictionary, key, out bool exists);
            if (exists) return false;
            node = new(new(key, value));
            _queue.AddLast(node);
            Debug.Assert(_queue.Count == _dictionary.Count);
            return true;
        }
    }

    public bool TryDequeue(out TKey key, out TValue value)
    {
        lock (_queue)
        {
            if (_queue.Count == 0) { key = default; value = default; return false; }
            var node = _queue.First;
            (key, value) = node.Value;
            _queue.RemoveFirst();
            bool removed = _dictionary.Remove(key);
            Debug.Assert(removed);
            Debug.Assert(_queue.Count == _dictionary.Count);
            return true;
        }
    }

    public bool TryTake(TKey key, out TValue value)
    {
        lock (_queue)
        {
            bool removed = _dictionary.Remove(key, out var node);
            if (!removed) { value = default; return false; }
            _queue.Remove(node);
            (_, value) = node.Value;
            Debug.Assert(_queue.Count == _dictionary.Count);
            return true;
        }
    }

    public KeyValuePair<TKey, TValue>[] ToArray()
    {
        lock (_queue) return _queue.ToArray();
    }
}

This combination is also used for creating LRU caches.

You can measure the lock contention in your own environment under load, by using the Monitor.LockContentionCount property: "Gets the number of times there was contention when trying to take the monitor's lock." If you see the delta per second to be a single digit number, there is nothing to worry about.

For a version that doesn't use the CollectionsMarshal.GetValueRefOrAddDefault method, and so it can be used on .NET versions older than .NET 6, see the first revision of this answer.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Good solution without too much overhead, will definitely try and see if it fits my use case. Not gonna accept yet because I'd like to hear other opinions too, but if locks operating here will indeed not affect performance meaningfully then it's a no-brainer – Johnny Cache May 31 '22 at 08:51
  • 1
    @JohnnyCache one downside of the above approach is that each enqueue operation allocates a `LinkedListNode` object. This shouldn't be a problem, unless you are writing a game or something that is affected by the garbage collector running frequently. Eliminating this allocation is not trivial though, with the built-in tools. You'll have to look at external packages, like [this](https://github.com/OndrejPetrzilka/Rock.Collections) for example, which is a hashset/dictionary that maintains order of insertion. – Theodor Zoulias May 31 '22 at 09:08
  • 1
    Thankfully, I don't think that frequent GC will cause too much issue in my case - there are few other things to be cleaned up along with it. It's time sensitive but I don't think anyone's gonna miss a few microseconds, if any at all. Valid remark for someone who will face the same problem in a game though, probably even myself in a few months lol – Johnny Cache May 31 '22 at 15:12
  • @JohnnyCache btw in case allocations were an issue, a possible solution would be to have a small pool of `LinkedListNode` objects and reuse them. I would expect that such a micromanagement would have comparable overhead with just letting the GC do its job, because the cost of creating and destructing these small objects should be trivial. – Theodor Zoulias Feb 06 '23 at 03:17