166

I'm using ConcurrentQueue for a shared data structure which purpose is holding the last N objects passed to it (kind of history).

Assume we have a browser and we want to have the last 100 browsed Urls. I want a queue which automatically drop (dequeue) the oldest (first) entry upon new entry insertion (enqueue) when the capacity gets full (100 addresses in history).

How can I accomplish that using System.Collections ?

Stephen Kennedy
  • 20,585
  • 22
  • 95
  • 108
Xaqron
  • 29,931
  • 42
  • 140
  • 205
  • 1
    http://stackoverflow.com/questions/590069/how-would-you-code-an-efficient-circular-buffer-in-java-or-c –  May 02 '11 at 01:55
  • It wasn't meant specifically for you, but for anyone who comes across this question and might find it useful. btw, it does talk about C# too. Did you manage to read _all_ the answers (in 2 minutes) and figure out that there is no C# code there? Anyway, I am not sure myself, and hence it is a comment... –  May 02 '11 at 01:57
  • You can just wrap the methods in a lock. Given that they are fast, you can just lock the whole array. This is probably a dupe though. Searching for circular buffer implementations with C# code might find you something. Anyway, good luck. –  May 02 '11 at 02:01

15 Answers15

133

I would write a wrapper class that on Enqueue would check the Count and then Dequeue when the count exceeds the limit.

 public class FixedSizedQueue<T>
 {
     readonly ConcurrentQueue<T> q = new ConcurrentQueue<T>();
     private object lockObject = new object();

     public int Limit { get; set; }
     public void Enqueue(T obj)
     {
        q.Enqueue(obj);
        lock (lockObject)
        {
           T overflow;
           while (q.Count > Limit && q.TryDequeue(out overflow)) ;
        }
     }
 }
stuzor
  • 2,275
  • 1
  • 31
  • 45
Richard Schneider
  • 34,944
  • 9
  • 57
  • 73
  • Lock will not be every helpful if Dequeue is called at the same time from other threads that do not lock this... – Erwin Mayer Jul 25 '11 at 15:57
  • 4
    `q` is private to the object, so that the `lock` will prevent other threads from simultaneous access. – Richard Schneider Jul 26 '11 at 22:02
  • 24
    It's not a good idea to lock. The whole purpose of the BCL concurrent collections is to provide provide lock free concurrency for performance reason. The locking in your code compromises that benefit. In fact I don't see a reason you need to lock the deq. – KFL Jul 02 '14 at 15:00
  • 4
    @KFL, need to lock because `Count` and `TryDequeue` are two independent operations that care not synched by BCL Concurrent. – Richard Schneider Jul 04 '14 at 00:34
  • 14
    @RichardSchneider If you need to handle concurrency issues yourself then it would be a good idea to swap the `ConcurrentQueue` object for a `Queue` object which is more lightweight. – 0b101010 Nov 04 '14 at 13:29
  • Would be better to use a Mutex instead of a lock? – Zorkind Dec 09 '14 at 18:54
  • How to get items from this queue? Another simple method for deqeue? –  Mar 30 '16 at 07:51
  • This doesn't even let you get objects from the queue. – Stealth Rabbi Jan 17 '18 at 17:02
  • 1
    Wouldn't be better if we inherit from `ConcurrentQueue` and add `new` to the `Enqueue` method? – Odys Jan 20 '18 at 10:01
  • 6
    Don't define your own queue, just use the inherited one. If you do as you do, you can actually do nothing else with the queue values, all other functions but your new `Enqueue` will still call the original queue. In other words, although this answer is marked as accepted, it's completely and utterly broken. – Gábor Apr 22 '18 at 15:42
  • @Odys @Gábor I would like to inherit from `ConcurrentQueue` but the I can't override the `Enqueue` method. – Richard Schneider Jun 24 '18 at 08:37
  • The answer is wrong in MS's docs: "The ConcurrentQueue and ConcurrentStack classes do not use locks at all. Instead, they rely on Interlocked operations to achieve thread-safety." https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/ In source code https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs has an object lock BUT that is on the segment. https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueueSegment.cs – jjhayter Apr 29 '20 at 01:21
  • I've added an example using ConcurrentQueue without locks. – jjhayter Apr 29 '20 at 02:02
115

I'd go for a slight variant... extend ConcurrentQueue so as to be able to use Linq extensions on FixedSizeQueue

public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
    private readonly object syncObject = new object();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public new void Enqueue(T obj)
    {
        base.Enqueue(obj);
        lock (syncObject)
        {
            while (base.Count > Size)
            {
                T outObj;
                base.TryDequeue(out outObj);
            }
        }
    }
}
Dave Lawrence
  • 3,843
  • 2
  • 21
  • 35
  • What happens when TryDequeue fails some reason? – Richard Schneider Jun 24 '14 at 08:41
  • 4
    what happens when someone statically knows the instance as a ConcurrentQueue, they've just circumvented your 'new' keyword. – mhand Dec 03 '14 at 02:05
  • 9
    @mhand If 'someone' wanted to do that; then they would have chosen to use a ConcurrentQueue object to begin with... This is a custom storage class. Nobody is seeking for this to be submitted to the .NET framework. You've sought out to create a problem for the sake of it. – Dave Lawrence Dec 03 '14 at 09:52
  • @RichardSchneider I presume then in that case; then the TryDequeue would be retried.. but I can't see why it would fail. Only one thread could be dequeuing at any one time. I'd have concerns about its behaviour if multiple threads were enqueuing while one thread was dequeuing as the count would be in a period of flux. However as a lightweight approach to a fixed size concurrent queue; it's not a bad approximation. – Dave Lawrence Dec 04 '14 at 12:03
  • 19
    my point is instead of subclassing maybe you should just wrap the queue... this enforces the desired behavior in all cases. Also, since it is a custom storage class, let's make it completely custom, only expose the operations we need, subclassing is the wrong tool here IMHO. – mhand Dec 04 '14 at 18:10
  • 3
    @mhand Yeah I get what you're saying.. I could wrap a queue and expose the queue's enumerator so as to make use of Linq extensions. – Dave Lawrence Dec 08 '14 at 17:06
  • 4
    i agree with @mhand you shouldn't inherit ConcurrentQueue because the Enqueue method is not virtual. You should proxy the queue and implement the whole interface if desired. – Chris Marisic Oct 06 '16 at 20:38
  • 1
    This is using base.Enqueue(obj); out of the lock if some thread read the values between enqueue and dequeue it would get more than size. – Mubashar Aug 29 '18 at 00:15
  • 1
    This is not good design. I never allow this use of 'new' in the codebase I'm responsible for. The problem: There are now actually two completely different methods with the name Enqueue(). If someone calls the base class (using the most generic variant of an object is good design) they get a completely different version of the method. I have seen a number of subtle bugs caused by this use of 'new', it should never be used. – Nils Lande Sep 28 '20 at 07:28
  • 1
    Near ten years later and possibly a bit wiser - I'd downvote this answer – Dave Lawrence Mar 14 '22 at 17:10
34

For anyone who finds it useful, here is some working code based on Richard Schneider's answer above:

public class FixedSizedQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        queue.Enqueue(obj);

        while (queue.Count > Size)
        {
            T outObj;
            queue.TryDequeue(out outObj);
        }
    }
}
Tod Thomson
  • 4,773
  • 2
  • 33
  • 33
  • 2
    Voting down for the reasons mentioned (locking when using a ConcurrentQueue is bad) in addition to not implementing any of the requisite interfaces for this to be a true collection. – Josh Apr 13 '18 at 15:44
16

For what its worth, here's a lightweight circular buffer with some methods marked for safe and unsafe use.

public class CircularBuffer<T> : IEnumerable<T>
{
    readonly int size;
    readonly object locker;

    int count;
    int head;
    int rear;
    T[] values;

    public CircularBuffer(int max)
    {
        this.size = max;
        locker = new object();
        count = 0;
        head = 0;
        rear = 0;
        values = new T[size];
    }

    static int Incr(int index, int size)
    {
        return (index + 1) % size;
    }

    private void UnsafeEnsureQueueNotEmpty()
    {
        if (count == 0)
            throw new Exception("Empty queue");
    }

    public int Size { get { return size; } }
    public object SyncRoot { get { return locker; } }

    #region Count

    public int Count { get { return UnsafeCount; } }
    public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
    public int UnsafeCount { get { return count; } }

    #endregion

    #region Enqueue

    public void Enqueue(T obj)
    {
        UnsafeEnqueue(obj);
    }

    public void SafeEnqueue(T obj)
    {
        lock (locker) { UnsafeEnqueue(obj); }
    }

    public void UnsafeEnqueue(T obj)
    {
        values[rear] = obj;

        if (Count == Size)
            head = Incr(head, Size);
        rear = Incr(rear, Size);
        count = Math.Min(count + 1, Size);
    }

    #endregion

    #region Dequeue

    public T Dequeue()
    {
        return UnsafeDequeue();
    }

    public T SafeDequeue()
    {
        lock (locker) { return UnsafeDequeue(); }
    }

    public T UnsafeDequeue()
    {
        UnsafeEnsureQueueNotEmpty();

        T res = values[head];
        values[head] = default(T);
        head = Incr(head, Size);
        count--;

        return res;
    }

    #endregion

    #region Peek

    public T Peek()
    {
        return UnsafePeek();
    }

    public T SafePeek()
    {
        lock (locker) { return UnsafePeek(); }
    }

    public T UnsafePeek()
    {
        UnsafeEnsureQueueNotEmpty();

        return values[head];
    }

    #endregion


    #region GetEnumerator

    public IEnumerator<T> GetEnumerator()
    {
        return UnsafeGetEnumerator();
    }

    public IEnumerator<T> SafeGetEnumerator()
    {
        lock (locker)
        {
            List<T> res = new List<T>(count);
            var enumerator = UnsafeGetEnumerator();
            while (enumerator.MoveNext())
                res.Add(enumerator.Current);
            return res.GetEnumerator();
        }
    }

    public IEnumerator<T> UnsafeGetEnumerator()
    {
        int index = head;
        for (int i = 0; i < count; i++)
        {
            yield return values[index];
            index = Incr(index, size);
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    #endregion
}

I like to use the Foo()/SafeFoo()/UnsafeFoo() convention:

  • Foo methods call UnsafeFoo as a default.
  • UnsafeFoo methods modify state freely without a lock, they should only call other unsafe methods.
  • SafeFoo methods call UnsafeFoo methods inside a lock.

Its a little verbose, but it makes obvious errors, like calling unsafe methods outside a lock in a method which is supposed to be thread-safe, more apparent.

Juliet
  • 80,494
  • 45
  • 196
  • 228
11

My version is just a subclass of normal Queue ones.. nothing special but seeing everyone participating and it still goes with the topic title I might as well put it here. It also returns the dequeued ones just in case.

public sealed class SizedQueue<T> : Queue<T>
{
    public int FixedCapacity { get; }
    public SizedQueue(int fixedCapacity)
    {
        this.FixedCapacity = fixedCapacity;
    }

    /// <summary>
    /// If the total number of item exceed the capacity, the oldest ones automatically dequeues.
    /// </summary>
    /// <returns>The dequeued value, if any.</returns>
    public new T Enqueue(T item)
    {
        base.Enqueue(item);
        if (base.Count > FixedCapacity)
        {
            return base.Dequeue();
        }
        return default;
    }
}
5argon
  • 3,683
  • 3
  • 31
  • 57
6

Just because no one's said it yet.. you can use a LinkedList<T> and add the thread safety:

public class Buffer<T> : LinkedList<T>
{
    private int capacity;

    public Buffer(int capacity)
    {
        this.capacity = capacity;   
    }

    public void Enqueue(T item)
    {
        // todo: add synchronization mechanism
        if (Count == capacity) RemoveLast();
        AddFirst(item);
    }

    public T Dequeue()
    {
        // todo: add synchronization mechanism
        var last = Last.Value;
        RemoveLast();
        return last;
    }
}

One thing to note is the default enumeration order will be LIFO in this example. But that can be overridden if necessary.

Brandon
  • 1,566
  • 2
  • 14
  • 22
5

Here's my take on the fixed size Queue

It uses regular Queue, to avoid the synchronization overhead when the Count property is used on ConcurrentQueue. It also implements IReadOnlyCollection so that LINQ methods can be used. The rest is very similar to the other answers here.

[Serializable]
[DebuggerDisplay("Count = {" + nameof(Count) + "}, Limit = {" + nameof(Limit) + "}")]
public class FixedSizedQueue<T> : IReadOnlyCollection<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private readonly object _lock = new object();

    public int Count { get { lock (_lock) { return _queue.Count; } } }
    public int Limit { get; }

    public FixedSizedQueue(int limit)
    {
        if (limit < 1)
            throw new ArgumentOutOfRangeException(nameof(limit));

        Limit = limit;
    }

    public FixedSizedQueue(IEnumerable<T> collection)
    {
        if (collection is null || !collection.Any())
           throw new ArgumentException("Can not initialize the Queue with a null or empty collection", nameof(collection));

        _queue = new Queue<T>(collection);
        Limit = _queue.Count;
    }

    public void Enqueue(T obj)
    {
        lock (_lock)
        {
            _queue.Enqueue(obj);

            while (_queue.Count > Limit)
                _queue.Dequeue();
        }
    }

    public void Clear()
    {
        lock (_lock)
            _queue.Clear();
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (_lock)
            return new List<T>(_queue).GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}
Ali Zahid
  • 1,329
  • 2
  • 17
  • 23
4

Let's add one more answer. Why this over others?

1) Simplicity. Trying to guarantee size is well and good but leads to unneeded complexity that can exhibit its own problems.

2) Implements IReadOnlyCollection, meaning you can use Linq on it and pass it into a variety of things that expect IEnumerable.

3) No locking. Many of the solutions above use locks, which is incorrect on a lockless collection.

4) Implements the same set of methods, properties, and interfaces ConcurrentQueue does, including IProducerConsumerCollection, which is important if you want to use the collection with BlockingCollection.

This implementation could potentially end up with more entries than expected if TryDequeue fails, but the frequency of that occurring doesn't seem worth specialized code that will inevitably hamper performance and cause its own unexpected problems.

If you absolutely want to guarantee a size, implementing a Prune() or similar method seems like the best idea. You could use a ReaderWriterLockSlim read lock in the other methods (including TryDequeue) and take a write lock only when pruning.

class ConcurrentFixedSizeQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>, ICollection {
    readonly ConcurrentQueue<T> m_concurrentQueue;
    readonly int m_maxSize;

    public int Count => m_concurrentQueue.Count;
    public bool IsEmpty => m_concurrentQueue.IsEmpty;

    public ConcurrentFixedSizeQueue (int maxSize) : this(Array.Empty<T>(), maxSize) { }

    public ConcurrentFixedSizeQueue (IEnumerable<T> initialCollection, int maxSize) {
        if (initialCollection == null) {
            throw new ArgumentNullException(nameof(initialCollection));
        }

        m_concurrentQueue = new ConcurrentQueue<T>(initialCollection);
        m_maxSize = maxSize;
    }

    public void Enqueue (T item) {
        m_concurrentQueue.Enqueue(item);

        if (m_concurrentQueue.Count > m_maxSize) {
            T result;
            m_concurrentQueue.TryDequeue(out result);
        }
    }

    public void TryPeek (out T result) => m_concurrentQueue.TryPeek(out result);
    public bool TryDequeue (out T result) => m_concurrentQueue.TryDequeue(out result);

    public void CopyTo (T[] array, int index) => m_concurrentQueue.CopyTo(array, index);
    public T[] ToArray () => m_concurrentQueue.ToArray();

    public IEnumerator<T> GetEnumerator () => m_concurrentQueue.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator () => GetEnumerator();

    // Explicit ICollection implementations.
    void ICollection.CopyTo (Array array, int index) => ((ICollection)m_concurrentQueue).CopyTo(array, index);
    object ICollection.SyncRoot => ((ICollection) m_concurrentQueue).SyncRoot;
    bool ICollection.IsSynchronized => ((ICollection) m_concurrentQueue).IsSynchronized;

    // Explicit IProducerConsumerCollection<T> implementations.
    bool IProducerConsumerCollection<T>.TryAdd (T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryAdd(item);
    bool IProducerConsumerCollection<T>.TryTake (out T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryTake(out item);

    public override int GetHashCode () => m_concurrentQueue.GetHashCode();
    public override bool Equals (object obj) => m_concurrentQueue.Equals(obj);
    public override string ToString () => m_concurrentQueue.ToString();
}
Josh
  • 632
  • 7
  • 13
3

Just for fun, here is another implementation that I believe addresses most of the commenters' concerns. In particular, thread-safety is achieved without locking and the implementation is hidden by the wrapping class.

public class FixedSizeQueue<T> : IReadOnlyCollection<T>
{
  private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  private int _count;

  public int Limit { get; private set; }

  public FixedSizeQueue(int limit)
  {
    this.Limit = limit;
  }

  public void Enqueue(T obj)
  {
    _queue.Enqueue(obj);
    Interlocked.Increment(ref _count);

    // Calculate the number of items to be removed by this thread in a thread safe manner
    int currentCount;
    int finalCount;
    do
    {
      currentCount = _count;
      finalCount = Math.Min(currentCount, this.Limit);
    } while (currentCount != 
      Interlocked.CompareExchange(ref _count, finalCount, currentCount));

    T overflow;
    while (currentCount > finalCount && _queue.TryDequeue(out overflow))
      currentCount--;
  }

  public int Count
  {
    get { return _count; }
  }

  public IEnumerator<T> GetEnumerator()
  {
    return _queue.GetEnumerator();
  }

  System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  {
    return _queue.GetEnumerator();
  }
}
erdomke
  • 4,980
  • 1
  • 24
  • 30
  • 2
    This is broken if used concurrently - what if a thread is preempted after calling `_queue.Enqueue(obj)` but before `Interlocked.Increment(ref _count)`, and the other thread calls `.Count`? It would get a wrong count. I haven't checked for the other issues. – KFL Apr 12 '17 at 23:33
3

Well it depends upon the use I have noticed that some of above solution may exceed the size when used in multip-threaded environment. Anyway my use case was to display last 5 events and there are multiple threads writing events into the queue and one other thread reading from it and displaying it in a Winform Control. So this was my solution.

EDIT: Since we already using locking within our implementation we don't really need ConcurrentQueue it may improve the performance.

class FixedSizedConcurrentQueue<T> 
{
    readonly Queue<T> queue = new Queue<T>();
    readonly object syncObject = new object();

    public int MaxSize { get; private set; }

    public FixedSizedConcurrentQueue(int maxSize)
    {
        MaxSize = maxSize;
    }

    public void Enqueue(T obj)
    {
        lock (syncObject)
        {
            queue.Enqueue(obj);
            while (queue.Count > MaxSize)
            {
                queue.Dequeue();
            }
        }
    }

    public T[] ToArray()
    {
        T[] result = null;
        lock (syncObject)
        {
            result = queue.ToArray();
        }

        return result;
    }

    public void Clear()
    {
        lock (syncObject)
        {
            queue.Clear();
        }
    }
}

EDIT: We don't really need syncObject in above example and we can rather use queue object since we are not re-initializing queue in any function and its marked as readonly anyway.

Mubashar
  • 12,300
  • 11
  • 66
  • 95
3

The accepted answer is going to have avoidable side-effects.

Fine-Grained Locking and Lock-Free Mechanisms

Links below are references that I used when I wrote my example below.

While the documentation from Microsoft is a bit misleading as they do use a lock they however lock the segement classes. The segment classes themselves use Interlocked.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace Lib.Core
{
    // Sources: 
    // https://learn.microsoft.com/en-us/dotnet/standard/collections/thread-safe/
    // https://learn.microsoft.com/en-us/dotnet/api/system.threading.interlocked?view=netcore-3.1
    // https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs
    // https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueueSegment.cs

    /// <summary>
    /// Concurrent safe circular buffer that will used a fixed capacity specified and resuse slots as it goes.
    /// </summary>
    /// <typeparam name="TObject">The object that you want to go into the slots.</typeparam>
    public class ConcurrentCircularBuffer<TObject>
    {
        private readonly ConcurrentQueue<TObject> _queue;

        public int Capacity { get; private set; }

        public ConcurrentCircularBuffer(int capacity)
        {
            if(capacity <= 0)
            {
                throw new ArgumentException($"The capacity specified '{capacity}' is not valid.", nameof(capacity));
            }

            // Setup the queue to the initial capacity using List's underlying implementation.
            _queue = new ConcurrentQueue<TObject>(new List<TObject>(capacity));

            Capacity = capacity;
        }

        public void Enqueue(TObject @object)
        {
            // Enforce the capacity first so the head can be used instead of the entire segment (slow).
            while (_queue.Count + 1 > Capacity)
            {
                if (!_queue.TryDequeue(out _))
                {
                    // Handle error condition however you want to ie throw, return validation object, etc.
                    var ex = new Exception("Concurrent Dequeue operation failed.");
                    ex.Data.Add("EnqueueObject", @object);
                    throw ex;
                }
            }

            // Place the item into the queue
            _queue.Enqueue(@object);
        }

        public TObject Dequeue()
        {
            if(_queue.TryDequeue(out var result))
            {
                return result;
            }

            return default;
        }
    }
}
jjhayter
  • 352
  • 4
  • 19
  • thanks for this.. i had an old piece of code from years ago that i'd used to lesser effect.. nice circular FIFO (y) – jim tollan Dec 10 '21 at 12:43
  • How do you access the queue - right now it cannot be enumerated or used in linq? Did you expose methods for that, or actually expose the underlying `_queue` as public (as in the answer it is private). – lonix Jun 25 '22 at 00:22
  • @ionix the implementation didn't expose it for the express purpose of keeping the details from leaking. – jjhayter Jun 27 '22 at 19:39
  • Do you remember what you meant by `Enforce the capacity first so the head can be used instead of the entire segment (slow).` Did you mean that dequeue first avoids possibility of internal structure resizing to n+1 ? – lonix Jun 30 '22 at 03:55
  • I think `while (_queue.Count + 1 > Capacity)` is not guaranteed to be entirely thread-safe in all scenarios. It introduces a potential race condition because the count of elements in the ConcurrentQueue can change between the Count property access and the subsequent TryDequeue method call. This means that multiple threads could simultaneously try to dequeue an item, potentially exceeding the capacity limit. – somedotnetguy Aug 03 '23 at 16:38
1

For your coding pleasure I submit to you the 'ConcurrentDeck'

public class ConcurrentDeck<T>
{
   private readonly int _size;
   private readonly T[] _buffer;
   private int _position = 0;

   public ConcurrentDeck(int size)
   {
       _size = size;
       _buffer = new T[size];
   }

   public void Push(T item)
   {
       lock (this)
       {
           _buffer[_position] = item;
           _position++;
           if (_position == _size) _position = 0;
       }
   }

   public T[] ReadDeck()
   {
       lock (this)
       {
           return _buffer.Skip(_position).Union(_buffer.Take(_position)).ToArray();
       }
   }
}

Example Usage:

void Main()
{
    var deck = new ConcurrentDeck<Tuple<string,DateTime>>(25);
    var handle = new ManualResetEventSlim();
    var task1 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task1",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(1).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    var task2 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task2",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(.5).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    var task3 = Task.Factory.StartNew(()=>{
    var timer = new System.Timers.Timer();
    timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task3",DateTime.Now));};
    timer.Interval = System.TimeSpan.FromSeconds(.25).TotalMilliseconds;
    timer.Enabled = true;
    handle.Wait();
    }); 
    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(10));
    handle.Set();
    var outputtime = DateTime.Now;
    deck.ReadDeck().Select(d => new {Message = d.Item1, MilliDiff = (outputtime - d.Item2).TotalMilliseconds}).Dump(true);
}
Chris Hayes
  • 3,876
  • 7
  • 42
  • 72
0

Here is yet another implementation that uses the underlying ConcurrentQueue as much as possible while providing the same interfaces made available via ConcurrentQueue.

/// <summary>
/// This is a FIFO concurrent queue that will remove the oldest added items when a given limit is reached.
/// </summary>
/// <typeparam name="TValue"></typeparam>
public class FixedSizedConcurrentQueue<TValue> : IProducerConsumerCollection<TValue>, IReadOnlyCollection<TValue>
{
    private readonly ConcurrentQueue<TValue> _queue;

    private readonly object _syncObject = new object();

    public int LimitSize { get; }

    public FixedSizedConcurrentQueue(int limit)
    {
        _queue = new ConcurrentQueue<TValue>();
        LimitSize = limit;
    }

    public FixedSizedConcurrentQueue(int limit, System.Collections.Generic.IEnumerable<TValue> collection)
    {
        _queue = new ConcurrentQueue<TValue>(collection);
        LimitSize = limit;

    }

    public int Count => _queue.Count;

    bool ICollection.IsSynchronized => ((ICollection) _queue).IsSynchronized;

    object ICollection.SyncRoot => ((ICollection)_queue).SyncRoot; 

    public bool IsEmpty => _queue.IsEmpty;

    // Not supported until .NET Standard 2.1
    //public void Clear() => _queue.Clear();

    public void CopyTo(TValue[] array, int index) => _queue.CopyTo(array, index);

    void ICollection.CopyTo(Array array, int index) => ((ICollection)_queue).CopyTo(array, index);

    public void Enqueue(TValue obj)
    {
        _queue.Enqueue(obj);
        lock( _syncObject )
        {
            while( _queue.Count > LimitSize ) {
                _queue.TryDequeue(out _);
            }
        }
    }

    public IEnumerator<TValue> GetEnumerator() => _queue.GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable<TValue>)this).GetEnumerator();

    public TValue[] ToArray() => _queue.ToArray();

    public bool TryAdd(TValue item)
    {
        Enqueue(item);
        return true;
    }

    bool IProducerConsumerCollection<TValue>.TryTake(out TValue item) => TryDequeue(out item);

    public bool TryDequeue(out TValue result) => _queue.TryDequeue(out result);

    public bool TryPeek(out TValue result) => _queue.TryPeek(out result);

}
Tod Cunningham
  • 3,691
  • 4
  • 30
  • 32
0
using System.Collections.Concurrent;

public class FixedSizeQueue<T>
{
    ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();

    private void Enque(T obj)
    {
        T temp;

        if (_queue.Count > 99)
        {
            // Remove one of the oldest added items.
            _queue.TryDequeue(out temp);
        }

        _queue.Enqueue(obj);
    }

    private bool Dequeue(out T obj)
    {
        return _queue.TryDequeue(out obj);
    }

    private void Clear()
    {
        T obj;

        // It does not fall into an infinite loop, and clears the contents of the present time.
        int cnt = _queue.Count;
        for (; cnt > 0; cnt--)
        {
            _queue.TryDequeue(out obj);
        }
    }
}
a_pcnic
  • 11
-1

This is my version of the queue:

public class FixedSizedQueue<T> {
  private object LOCK = new object();
  ConcurrentQueue<T> queue;

  public int MaxSize { get; set; }

  public FixedSizedQueue(int maxSize, IEnumerable<T> items = null) {
     this.MaxSize = maxSize;
     if (items == null) {
        queue = new ConcurrentQueue<T>();
     }
     else {
        queue = new ConcurrentQueue<T>(items);
        EnsureLimitConstraint();
     }
  }

  public void Enqueue(T obj) {
     queue.Enqueue(obj);
     EnsureLimitConstraint();
  }

  private void EnsureLimitConstraint() {
     if (queue.Count > MaxSize) {
        lock (LOCK) {
           T overflow;
           while (queue.Count > MaxSize) {
              queue.TryDequeue(out overflow);
           }
        }
     }
  }


  /// <summary>
  /// returns the current snapshot of the queue
  /// </summary>
  /// <returns></returns>
  public T[] GetSnapshot() {
     return queue.ToArray();
  }
}

I find it useful to have a constructor that is built upon an IEnumerable and I find it useful to have a GetSnapshot to have a multithread safe list (array in this case) of the items at the moment of the call, that doesn't rise errors if the underlaying collection changes.

The double Count check is to prevent the lock in some circumstances.

Pang
  • 9,564
  • 146
  • 81
  • 122
Not Important
  • 762
  • 6
  • 22
  • 1
    Voting down for locking on the queue. If you absolutely want to lock, a ReaderWriterLockSlim would be best (assuming you expect to take a read lock more often than a write lock). GetSnapshot also isn't needed. If you implement IReadOnlyCollection (which you should for IEnumerable semantics), ToList() will serve the same function. – Josh Apr 13 '18 at 16:21
  • The ConcurrentQueue handles the locks in its implementation, see the links in my answer. – jjhayter Apr 29 '20 at 02:26