2

I need a queue with these capabilities:

  • fixed-size (i.e. circular buffer)
  • queue items have ids (like a primary key), which are sequential
  • thread-safe (used from multiple ASP.NET Core requests)

To avoid locking, I tried a ConcurrentQueue but found race conditions. So I'm trying a custom approach.

public interface IQueueItem
{
    long Id { get; set; }
}

public class CircularBuffer<T> : LinkedList<T> where T : class, IQueueItem
{
    public CircularBuffer(int capacity) => _capacity = capacity;
    private readonly int _capacity;

    private long _counter = 0;
    private readonly object _lock = new();

    public void Enqueue(T item)
    {
        lock (_lock) {         // works but feels "heavy"
            _counter++;
            item.Id = _counter;
            if (Count == _capacity) RemoveFirst();
            AddLast(item);
        }
    }
}

And to test:

public class Item : IQueueItem
{
    public long Id { get; set; }
    //...
}

public class Program
{
    public static void Main()
    {
        var q = new CircularBuffer<Item>(10);
        Parallel.For(0, 15, i => q.Enqueue(new Item()));
        Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
    }
}

Which gives correct output (is ordered even though enqueued by competing threads, and has fixed size with oldest items dequeued):

6, 7, 8, 9, 10, 11, 12, 13, 14, 15

In reality, I have web requests that read (i.e. enumerate) that queue.

The problem: if one thread is enumerating the queue while another thread is adding to it, I will have errors. (I could use a ToList() before the read, but for a large queue that will suck up all the server's memory as this could be done many times a second by multiple requests). How can I deal with that scenario? I used a linked list, but I'm flexible to use any structure.

(Also, that seems to be a really heavy lock section; is there a more performant way?)

UPDATE
As asked in comments below: I expect the queue to have from a few hundred to a few tens of thousand items, but the items themselves are small (just a few primitive data types). I expect an enqueue every second. Reads from web requests are less often, let's say a few times per minute (but can occur concurrently to the server writing to the queue).

lonix
  • 14,255
  • 23
  • 85
  • 176
  • Do you want the enumeration to reflect the state of the queue when the enumeration starts? If so then you'd want to do a lock and return a copy. Also since this is a fixed size you could implement it with an array. – juharr Jun 29 '22 at 11:52
  • Use producer consumer collections https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-implement-a-producer-consumer-dataflow-pattern or BlockingCollection https://learn.microsoft.com/en-us/dotnet/standard/collections/thread-safe/blockingcollection-overview, you can specify size and behavior on reaching that size. More about difference between two - https://stackoverflow.com/questions/21163487/tpl-dataflow-vs-blockingcollection – eocron Jun 29 '22 at 11:54
  • @juharr Thanks. I updated the question to note that. I could make a `ToList()` copy, but it would quickly eat up all my resources. Maybe I could lock the enumeration, but that would slow down the requests... Any other options? – lonix Jun 29 '22 at 11:57
  • @juharr By the way I chose a linked list rather than array, as I imagine it would be faster to add/remove first/last nodes in a linked list than an array. – lonix Jun 29 '22 at 12:03
  • 1
    What is the use of this queue? Why do you need to enumerate something while it's being updated? Typically with a queue you'd just put stuff in and take it out, not enumerate it. – juharr Jun 29 '22 at 12:09
  • Enumerating a queue while items are being concurrently added and removed is likely to have race conditions. For example, an item could be removed from the queue in-between the enumerator obtaining the item to return, and the consumer actually processing it. For this reason the only safe operations on a concurrent queue are enqueuing and dequeuing. Even checking the length of a queue would be thread-unsafe, since the number of items can change after the length is read by a consumer but before it is used. – Matthew Watson Jun 29 '22 at 12:14
  • As mentioned above its data is read by web requests. (Server events are written to it, and a UI shows them to the user. There is a fixed size to prevent running out of memory.) Can you suggest a better structure - it can be anything that meets the criteria above: thread safe, fixed size and a sequence for the items. – lonix Jun 29 '22 at 12:17
  • Yes, I suggest using a [`BlockingCollection`](https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1?view=net-6.0) – Matthew Watson Jun 29 '22 at 12:18
  • Something to be aware of regarding your `CircularBuffer`/`IQueueItem` design, is that the generated IDs are unique per buffer. If you create two buffers and enqueue one item to each buffer, the two items will have the same `Id`. Also if you enqueue the same item to two different buffers, its `Id` will be overwritten on the second `Enqueue`. – Theodor Zoulias Jun 29 '22 at 20:38

3 Answers3

3

Based on the metrics that you provided in the question, you have plenty of options. The anticipated usage of the CircularBuffer<T> is not really that heavy. Wrapping a lock-protected Queue<T> should work pretty well. The cost of copying the contents of the queue into an array on each enumeration (copying 10,000 elements a few times per minute) is unlikely to be noticeable. Modern machines can do such things in the blink of an eye. You'd have to enumerate the collection hundreds of times per second for this to start (slightly) becoming an issue.

In my original answer (revision 3) I had proposed using an ImmutableQueue<T> as underlying storage. After closer inspection I noticed that this class is not exactly pay-for-play. The first time it is enumerated it invokes the internal BackwardsReversed property (source code), which is quite costly. My performance tests confirmed that it's a worse solution than the simple lock-protected Queue<T> that is shown in lonix's answer, regarding both CPU-time and allocations.

Below is a lower-level implementation of a similar idea, that exploits the fact that we need only a subset of the functionality of the ImmutableQueue<T> class. The items are stored in a singly linked-list structure, that can be enumerated without additional costs:

public class ConcurrentCircularBuffer<T> : IEnumerable<T> where T : IQueueItem
{
    private readonly object _locker = new();
    private readonly int _capacity;
    private Node _head;
    private Node _tail;
    private int _count = 0;
    private long _lastId = 0;

    private class Node
    {
        public readonly T Item;
        public Node Next;
        public Node(T item) => Item = item;
    }

    public ConcurrentCircularBuffer(int capacity)
    {
        if (capacity < 1) throw new ArgumentOutOfRangeException(nameof(capacity));
        _capacity = capacity;
    }

    public int Count => Volatile.Read(ref _count);

    public void Enqueue(T item)
    {
        lock (_locker)
        {
            Node node = new(item);
            if (_head is null) _head = node;
            if (_tail is not null) _tail.Next = node;
            _tail = node;
            if (_count < _capacity) _count++; else _head = _head.Next;
            item.Id = ++_lastId;
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        Node node; int count;
        lock (_locker) { node = _head; count = _count; }
        for (int i = 0; i < count && node is not null; i++, node = node.Next)
            yield return node.Item;
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

The main advantage of this approach over the lock-protected Queue<T> is that it minimizes the contention. The work done while holding the lock is minuscule.

An alternative implementation of the ConcurrentCircularBuffer<T> class, based on two array buffers and having different pros and cons, can be found in the 5th revision of this answer.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Theodor is there anything about concurrency you don't know?! :-P Thanks a lot! By the way, you showed two ways, using `Queue` with worse perf but still good (and **MUCH** simpler so I am more comfortable with it) and `ImmutableQueue` which was very exciting until I saw the dreaded `volatile` keyword. But you also said "for the sake of variety" - so in a real world case, which approach would you personally take? (I want to take the approach you would take :)) – lonix Jun 29 '22 at 15:45
  • @lonix I am not sure what I would choose either. Probably it doesn't matter, since both are OK. If I had plenty of time I might spent some to write some benchmarks, and get experimental measurements. I would expect to see a tipping point where increasing the frequency of enumerations favors the `ImmutableQueue`, and increasing the frequency of `Enqueue` favors the `Queue`. – Theodor Zoulias Jun 29 '22 at 17:19
  • Thanks. Even without benchmarking that sounds logical - I went with your ImmutableQueue idea. For the sake of completeness I added an answer with a Queue implementation below. If you have a moment please see if it looks safe to you (I used your ideas above as the reference). – lonix Jun 30 '22 at 02:12
1

Since ConcurrentQueue is out in this question, you can try fixed array.

IQueueItem[] items = new IQueueItem[SIZE];
long id = 0;

Enqueue is simple.

void Enqueue(IQueueItem item)
{
    long id2 = Interlocked.Increment(ref id);
    item.Id = id2 - 1;
    items[id2 % SIZE] = item;
}

To output the data, you just need copy the array to a new one, then sort it. (of course, it can be optimized here)

var arr = new IQueueItem[SIZE];
Array.Copy(items, arr, SIZE);
return arr.Where(a => a != null).OrderBy(a => a.Id);

There may be some gaps in the array because of the concurrent insertions, you can take a sequence till a gap is found.

var e = arr.Where(a => a != null).OrderBy(a => a.Id);
var firstId = e.First().Id;
return e.TakeWhile((a, index) => a.Id - index == firstId);
shingo
  • 18,436
  • 5
  • 23
  • 42
  • Hi shingo we meet again. This should work nicely, thanks. I am concerned it will be expensive to make copies of a large queue (my queue is quite large). But maybe this is the only way. I will run some tests to find out... – lonix Jun 29 '22 at 12:58
  • You can use double buffering to avoid memory copy, but you need some code to handle incontinuous ids. – shingo Jun 29 '22 at 14:09
  • I marked the other as the answer as it handles better performance for large queues - but I also tried your way and it was very simple and very good. I recommend it to all future readers. Ironically, an array with locking is the simplest approach and easy to understand. Thanks! – lonix Jun 29 '22 at 15:47
1

Here is another implementation, using a Queue<T> with locking.

public interface IQueueItem
{
    long Id { get; set; }
}

public class CircularBuffer<T> : IEnumerable<T> where T : class, IQueueItem
{
    private readonly int _capacity;
    private readonly Queue<T> _queue;
    private long _lastId = 0;
    private readonly object _lock = new();

    public CircularBuffer(int capacity) {
        _capacity = capacity;
        _queue = new Queue<T>(capacity);
    }

    public void Enqueue(T item)
    {
        lock (_lock) {
            if (_capacity < _queue.Count)
                _queue.Dequeue();
            item.Id = ++_lastId;
            _queue.Enqueue(item);
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (_lock) {
            var copy = _queue.ToArray();
            return ((IEnumerable<T>)copy).GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

}

And to test:

public class Item : IQueueItem
{
    private long _id;

    public long Id
    {
        get => Volatile.Read(ref _id);
        set => Volatile.Write(ref _id, value);
    }
}

public class Program
{
    public static void Main()
    {
        var q = new CircularBuffer<Item>(10);
        Parallel.For(0, 15, i => q.Enqueue(new Item()));
        Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
    }
}

Result:

6, 7, 8, 9, 10, 11, 12, 13, 14, 15

lonix
  • 14,255
  • 23
  • 85
  • 176
  • In fact I'm not sure using a `Queue` is better/faster/safer than a `LinkedList` as in the original question (assuming it also has locking for the enumeration). Or even an `Array`. I suppose it should be tested. But my subjective gut feeling is since we only ever queue and dequeue, and enumerate (but never seek in the middle), my original code for the linked list (in the question above) is preferable - but it needs locking around the enumeration. Maybe I'll add that as another answer. – lonix Jun 30 '22 at 02:22
  • The `CircularBuffer` implementation seems OK. The `_count` is not really needed, because the `Queue` has a `Count` property. The `Item` class doesn't seem thread-safe though, because the `Id` is not accessed with volatile semantics. So if it's changed by one thread, the change might not be visible from another thread. You would need the advice of an [expert](https://stackoverflow.com/a/66490395/11178549) to tell you if it's safe or not. If you want to be on the safe(r) side, you'd better add `Volatile` read/write in the getter and setter of the property. – Theodor Zoulias Jun 30 '22 at 02:33
  • @TheodorZoulias So what you're saying is even the concurrency guru, has his own concurrency guru? :-P It's concurrent multi-threaded turtles all the way down. – lonix Jun 30 '22 at 02:37
  • 1
    Yeah. When it comes to memory models, cache coherency protocols, CPU architectures etc, it's all Greek to me. :-) I have no ambition to learn this stuff. It's too far away from programming. – Theodor Zoulias Jun 30 '22 at 02:47
  • @TheodorZoulias Edge case here, maybe in other solutions on this page too (from something I read [here](https://stackoverflow.com/a/61492997/9971404)). Enqueue is performed before dequeue - so when reaching capacity, the internal structure is resized to n+1. If concurrency is high this could occur many times, until queue is large enough so it doesn't happen any more - so it's better to add some margin, e.g. for capacity of 100, create queue with actual capacity of 120. A better option would be to simply dequeue before enqueue. I think your solution avoids this edge case. – lonix Jun 30 '22 at 03:47
  • 1
    ​​Yep, in the case of a `Queue` the order matters, because it has an internal buffer. In the case of an `ImmutableQueue` the order is probably irrelevant, because it is [implemented](https://github.com/dotnet/runtime/blob/0402550585f54e88707af55652111ace793ef27f/src/libraries/System.Collections.Immutable/src/System/Collections/Immutable/ImmutableQueue_1.cs) as a linked list. – Theodor Zoulias Jun 30 '22 at 05:24