1

I have three questions:

  1. What do you generally think about my approach to solve the given problem?
  2. What do you think I could further improve performance wise?
  3. The most important one: How do I make my implementation really thread safe?

At first the simplified scenario I'm in: I am communicating via a messaging system with different devices. I am receiving and sending thousands and thousands of messages in a rather short time period. I am inside of a multithreading environment so a lot of different tasks are sending and expecting messages. For the message reception an event driven approach got us a lot of trouble in the sense of making it thread safe. I have a few Receiver tasks which get messages from outside and have to deliver these messages to a lot of consumer tasks.

So I came up with a different approach: Why not have a history of a few thousand messages where every new message is enqueued and the consumer tasks can search backwards from the newest item to the last processed item in order to get all newly arrived messages. Of course this has to be fast and thread safe.

I came up with the idea of a linked ring buffer and implemented the following:

  public class LinkedRingBuffer<T>
  {
     private LinkedRingBufferNode<T> firstNode;
     private LinkedRingBufferNode<T> lastNode;

     public LinkedRingBuffer(int capacity)
     {
        Capacity = capacity;
        Count = 0;
     }

     /// <summary>
     /// Maximum count of items inside the buffer
     /// </summary>
     public int Capacity { get; }

     /// <summary>
     /// Actual count of items inside the buffer
     /// </summary>
     public int Count { get; private set; }

     /// <summary>
     /// Get value of the oldest buffer entry
     /// </summary>
     /// <returns></returns>
     public T GetFirst()
     {
        return firstNode.Item;
     }

     /// <summary>
     /// Get value of the newest buffer entry
     /// </summary>
     /// <returns></returns>
     public T GetLast()
     {
        return lastNode.Item;
     }

     /// <summary>
     /// Add item at the end of the buffer. 
     /// If capacity is reached the link to the oldest item is deleted.
     /// </summary>
     public void Add(T item)
     {
        /* create node and set to last one */
        var node = new LinkedRingBufferNode<T>(lastNode, item);
        lastNode = node;
        /* if it is the first node, the created is also the first */
        if (firstNode == null)
           firstNode = node;
        /* check for capacity reach */
        Count++;
        if(Count > Capacity)
        {/* deleted all links to the current first so that its eventually gc collected */
           Count = Capacity;
           firstNode = firstNode.NextNode;
           firstNode.PreviousNode = null;
        }
     }

     /// <summary>
     /// Iterate through the buffer from the oldest to the newest item
     /// </summary>
     public IEnumerable<T> LastToFirst()
     {
        var current = lastNode;
        while(current != null)
        {
           yield return current.Item;
           current = current.PreviousNode;
        }
     }

     /// <summary>
     /// Iterate through the buffer from the newest to the oldest item
     /// </summary>
     public IEnumerable<T> FirstToLast()
     {
        var current = firstNode;
        while (current != null)
        {
           yield return current.Item;
           current = current.NextNode;
        }
     }

     /// <summary>
     /// Iterate through the buffer from the oldest to given item. 
     /// If item doesn't exist it iterates until it reaches the newest
     /// </summary>
     public IEnumerable<T> LastToReference(T item)
     {
        var current = lastNode;
        while (current != null)
        {
           yield return current.Item;
           if (current.Item.Equals(item))
              break;
           current = current.PreviousNode;
        }
     }

     /// <summary>
     /// Iterate through the buffer from the newest to given item. 
     /// If item doesn't exist it iterates until it reaches the oldest
     /// </summary>
     public IEnumerable<T> FirstToReference(T item)
     {
        var current = firstNode;
        while (current != null)
        {
           yield return current.Item;
           if (current.Item.Equals(item))
              break;
           current = current.PreviousNode;
        }
     }

     /// <summary>
     /// Represents a linked node inside the buffer and holds the data
     /// </summary>
     private class LinkedRingBufferNode<A>
     {
        public LinkedRingBufferNode(LinkedRingBufferNode<A> previousNode, A item)
        {
           Item = item;
           NextNode = null;
           PreviousNode = previousNode;
           if(previousNode != null)
              previousNode.NextNode = this;
        }
        internal A Item { get; }
        internal LinkedRingBufferNode<A> PreviousNode { get; set; }
        internal LinkedRingBufferNode<A> NextNode { get; private set; }
     }
  }

But unfortunately I'm kind of new to the multithreading environment, so how would I make this buffer thread safe for multiple reads and writes?

Thanks!

Gediminas Masaitis
  • 3,172
  • 14
  • 35
sirloin
  • 33
  • 2

1 Answers1

1

I think the simplest way would be to have a synchronization object which you would lock on, whenever performing thread-critical code. The code within a lock block is called the critical section, and can only be accessed by one thread at a time. Any other thread wishing to access it will wait, until the lock is released.

Definition and initialization:

private object Synchro;

public LinkedRingBuffer(int capacity)
{
    Synchro = new object();
    // Other constructor code
}

Usage:

public T GetFirst()
{
    lock(Synchro)
    {
        return firstNode.Item;
    }
}

When writing thread-safe code, locking some parts may seem obvious. But if you're not sure whether or not to lock a statement or block of code, for both read and write safety you need to consider:

  • Whether or not this code can influence the behavior or result of any other locked critical sections.
  • Whether or not any other locked critical sections can influence this code's behavior or result.

You will also need to rewrite some of your auto-implemented properties to have a backing field. It should be pretty straightforward, however...

Your usage of yield return, while being pretty smart and efficient in a single-thread context, will cause trouble in a multi-threaded context. This is because yield return doesn't release a lock statement (and it shouldn't). You will have to perform materialization in a wrapper, wherever you use yield return.

Your thread-safe code looks like this:

public class LinkedRingBuffer<T>
{
    private LinkedRingBufferNode<T> firstNode;
    private LinkedRingBufferNode<T> lastNode;
    private object Synchro;

    public LinkedRingBuffer(int capacity)
    {
        Synchro = new object();
        Capacity = capacity;
        Count = 0;
    }

    /// <summary>
    /// Maximum count of items inside the buffer
    /// </summary>
    public int Capacity { get; }

    /// <summary>
    /// Actual count of items inside the buffer
    /// </summary>
    public int Count
    {
        get
        {
            lock (Synchro)
            {
                return _count;
            }
        }
        private set
        {
            _count = value;
        }
    }
    private int _count;

    /// <summary>
    /// Get value of the oldest buffer entry
    /// </summary>
    /// <returns></returns>
    public T GetFirst()
    {
        lock (Synchro)
        {
            return firstNode.Item;
        }
    }

    /// <summary>
    /// Get value of the newest buffer entry
    /// </summary>
    /// <returns></returns>
    public T GetLast()
    {
        lock (Synchro)
        {
            return lastNode.Item;
        }
    }

    /// <summary>
    /// Add item at the end of the buffer. 
    /// If capacity is reached the link to the oldest item is deleted.
    /// </summary>
    public void Add(T item)
    {
        lock (Synchro)
        {
            /* create node and set to last one */
            var node = new LinkedRingBufferNode<T>(lastNode, item);
            lastNode = node;
            /* if it is the first node, the created is also the first */
            if (firstNode == null)
                firstNode = node;
            /* check for capacity reach */
            Count++;
            if (Count > Capacity)
            {
                /* deleted all links to the current first so that its eventually gc collected */
                Count = Capacity;
                firstNode = firstNode.NextNode;
                firstNode.PreviousNode = null;
            }
        }
    }

    /// <summary>
    /// Iterate through the buffer from the oldest to the newest item
    /// </summary>
    public IEnumerable<T> LastToFirst()
    {
        lock (Synchro)
        {
            var materialized = LastToFirstInner().ToList();
            return materialized;
        }
    }

    private IEnumerable<T> LastToFirstInner()
    {
        var current = lastNode;
        while (current != null)
        {
            yield return current.Item;
            current = current.PreviousNode;
        }
    }

    /// <summary>
    /// Iterate through the buffer from the newest to the oldest item
    /// </summary>
    public IEnumerable<T> FirstToLast()
    {
        lock (Synchro)
        {
            var materialized = FirstToLastInner().ToList();
            return materialized;
        }
    }

    private IEnumerable<T> FirstToLastInner()
    {
        var current = firstNode;
        while (current != null)
        {
            yield return current.Item;
            current = current.NextNode;
        }
    }

    /// <summary>
    /// Iterate through the buffer from the oldest to given item. 
    /// If item doesn't exist it iterates until it reaches the newest
    /// </summary>
    public IEnumerable<T> LastToReference(T item)
    {
        lock (Synchro)
        {
            var materialized = LastToReferenceInner(item).ToList();
            return materialized;
        }
    }

    private IEnumerable<T> LastToReferenceInner(T item)
    {
        var current = lastNode;
        while (current != null)
        {
            yield return current.Item;
            if (current.Item.Equals(item))
                break;
            current = current.PreviousNode;
        }
    }

    /// <summary>
    /// Iterate through the buffer from the newest to given item. 
    /// If item doesn't exist it iterates until it reaches the oldest
    /// </summary>
    public IEnumerable<T> FirstToReference(T item)
    {
        lock (Synchro)
        {
            var materialized = FirstToReferenceInner(item).ToList();
            return materialized;
        }
    }

    private IEnumerable<T> FirstToReferenceInner(T item)
    {
        var current = firstNode;
        while (current != null)
        {
            yield return current.Item;
            if (current.Item.Equals(item))
                break;
            current = current.PreviousNode;
        }
    }

    /// <summary>
    /// Represents a linked node inside the buffer and holds the data
    /// </summary>
    private class LinkedRingBufferNode<A>
    {
        public LinkedRingBufferNode(LinkedRingBufferNode<A> previousNode, A item)
        {
            Item = item;
            NextNode = null;
            PreviousNode = previousNode;
            if (previousNode != null)
                previousNode.NextNode = this;
        }
        internal A Item { get; }
        internal LinkedRingBufferNode<A> PreviousNode { get; set; }
        internal LinkedRingBufferNode<A> NextNode { get; private set; }
    }
}

There can be some optimizations done, for example you don't need to create the LinkedRingBufferNode objects inside the critical section, however you would have to copy the lastNode value to a local variable inside a critical section, before creating the object.

Community
  • 1
  • 1
Gediminas Masaitis
  • 3,172
  • 14
  • 35
  • Thanks for your fast reply! So this approach when one of the Iterators is used the ToList call will create a whole new list (synchronized) which can be iterated in a foreach afterwards, is that correct? Or is the lock blocking the whole foreach iteration process? I also fear that the ToList() call will have a huge performance impact on a buffer with several thousand items but I'll run a benchmark tomorrow. Perhaps just forgetting about the foreach and manually search the list with locked GetPrevious()/GetNext() calls would be way more faster. – sirloin Mar 06 '16 at 18:28