3

I've got a class, which should be thread safe. I preferably want to manage thread safety with a single synchronization object in order to avoid complicated mindbreakers, since all methods alter object state variables. So I wrap method bodies with a lock statement on that object. There are scenario's where the lock needs to be released for a while in order to allow another thread to update the state. So far so good, just use Monitor.Wait() and Monitor.Pulse(). However, I would like to 'Pulse' with a condition. In the code below, I want to send a 'Pulse' only to a thread waiting in the 'Send()' method. And similarly, send a 'Pulse' only to a thread waiting in the 'Receive()' method.

So summarizing:

  • I prefer to use a single synchronization object to lock on, because each of the four methods alters object state variables.
  • The Wait() should release the lock, but wait for specific pulses. After being notified, the lock should be re-acquired.
  • The Pulse() should only notify one waiting thread, either the send or receive waiter.
  • Preferably be able to use a CancellationToken as well, to cancel the wait.

I've tried many things, including Monitor, Semaphore and WaitHandle combinations, queues with WaitHandles and more creative options. Also, I've been playing with multiple synchronization objects. But in each scenario I only get parts of the functionality to work.

Below code is the closest I've gotten. The TODO comments show what is wrong with the code.

public class Socket
{
    public class Item { }

    private object sync = new object();
    private ManualResetEvent receiveAvailable = new ManualResetEvent(false);

    private Queue<Item> receiveQueue = new Queue<Item>();

    // used by client, from any thread
    public void Send(Item item, CancellationToken token)
    {
        lock (this.sync)
        {
            // sends the message somewhere and should await confirmation.
            // note that the confirmation order matters.

            // TODO: Should only continue on notification from 'NotifySent()', and respect the cancellation token
            Monitor.Wait(this.sync); 
        }
    }

    // used by client, from any thread
    public Item Receive(CancellationToken token)
    {
        lock (this.sync)
        {
            if (!this.receiveAvailable.WaitOne(0))
            {
                // TODO: Should only be notified by 'EnqueueReceived()' method, and respect the cancellation token.
                Monitor.Wait(this.sync);
            }

            var item = this.receiveQueue.Dequeue();
            if (this.receiveQueue.Count == 0)
            {
                this.receiveAvailable.Reset();
            }

            return item;
        }
    }

    // used by internal worker thread
    internal void NotifySent()
    {
        lock (this.sync)
        {
            // Should only notify the Send() method.
            Monitor.Pulse(this.sync);
        }
    }

    // used by internal worker thread
    internal void EnqueueReceived(Item item)
    {
        lock (this.sync)
        {
            this.receiveQueue.Enqueue(item);
            this.receiveAvailable.Set();

            // TODO: Should only notify the 'Receive()' method.
            Monitor.Pulse(this.sync);
        }
    }
}

SIDENOTE: In python, my requirement is possible using a threading.Condition (ignoring the CancellationToken). Perhaps a similar construct in available in C#?

class Socket(object):
    def __init__(self):
        self.sync = threading.RLock()
        self.receive_queue = collections.deque()
        self.send_ready = threading.Condition(self.sync)
        self.receive_ready = threading.Condition(self.sync)

    def send(self, item):
        with self.send_ready:
            // send the message
            self.send_ready.wait()

    def receive(self):
        with self.receive_ready:
            try:
                return self.receive_queue.popleft()
            except IndexError:
                self.receive_ready.wait()
            return self.receive_queue.popleft()

    def notify_sent(self):
        with self.sync:
            self.send_ready.notify()

    def enqueue_received(self, item):
        with self.sync:
            self.receive_queue.append(item)
            self.receive_ready.notify()
Jesse de Wit
  • 3,867
  • 1
  • 20
  • 41
  • 1
    "to avoid complicated mindbreakers" - and then you went on to create your own. – H H Mar 04 '19 at 14:38
  • @HenkHolterman Well yes, unless you can show me where to find an open source LLCP socket implementation in C#. – Jesse de Wit Mar 04 '19 at 14:41
  • I'm just wondering about the "single synchronization object", why does Receive have to wait for Send? Afaik the socket is thread-safe (w. respect to send/receive). Wouldn't all your issues be solved by using 2 sync objects? – H H Mar 04 '19 at 14:57
  • 2
    Yeah, that is a pretty deep hole. That MRE is a nasty threading race bug since it can't signal a specific producer item. Use ConcurrentQueue<>, that takes care of synchronizing the consumer to the producer. Use a little wrapper class to store the Item, adding a ManualResetEventSlim and CancellationToken. You Set() it in the consumer when you receive the wrapper object, releasing the Wait() in the producer. – Hans Passant Mar 04 '19 at 15:02
  • @HenkHolterman Good point. However, there is also a Connect() method that both sends and receives, and `EnqueueReceived` may send a rejection response. Also, all four methods access/modify the same object state variables. So using multiple synchronization objects is frying my brain. – Jesse de Wit Mar 04 '19 at 15:03
  • 1
    The sync object guards the thread-safe access to a certain resource. In your case you have 2 different "resources", 2 different queues. So use 2 different lock objects. Note 1: using Pulse and Wait is a risky practice.where drawbacks are described in the Pulse article on MSDN. Note 2: your pattern seems to be applicable for `BlockingCollection`. – Mike Makarov Mar 04 '19 at 15:03
  • @HansPassant I've tried creating the wrapper object like you said before. However I've got variables like the socket state (open, closing, closed, etc), amount of packets sent, received, acknowledged and sequence numbers inside the packets. So locking will be required somewhere, at least to modify multiple variables in a single 'transaction'. However, you did motivate to try that road again. – Jesse de Wit Mar 04 '19 at 15:17
  • I guess I'll either need to use [3 synchronization objects (send, receive, state variables) together with the `Monitor`], or [1 synchronization object for access to the state variables and a wrapper with waithandles to connect consumer to producer]. I'll look into `BlockingCollection` too. Life would just be so much easier if I could wrap the whole method body in a lock... – Jesse de Wit Mar 04 '19 at 15:30
  • @Hanspassant just noting the mre is not a race bug in above code due to the wrapping lock statement. – Jesse de Wit Mar 06 '19 at 19:32

2 Answers2

1

What you're looking for is Condition Variables, which are not directly exposed in any .NET APIs. The Monitor is the closest built-in type to what you're looking for, which is a Mutex combined with a single Condition Variable.

The standard way of solving this in .NET is to always re-check the condition (on the waiting side) before continuing. This is also necessary to handle spurious wakeups, which can happen for all Condition Variable-based solutions.

Thus:

// Note: 'while', not 'if'
while (!this.receiveAvailable.WaitOne(0))
{
  Monitor.Wait(this.sync);
}

Etc.

In .NET, since you don't have Condition Variables, you will have more spurious wakeups than if you had designated conditions, but even in the designated conditions scenario, spurious wakeups can happen.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Thanks for your answer. It's a good pattern. I've considered that pattern for my use case, but it would mess up the order, because waiting again would put it back in the end of the waiting queue. I hadn't considered the spurious wakeups though. Good lesson. – Jesse de Wit Mar 06 '19 at 19:30
0

I believe I have found a solution to my problem thanks to your comments. I decided to separate the state variables to an external class, so locking in the socket and managing thread safety on the client side becomes easier. This way I can manage the state variables myself in a single thread (In a seperate class, not shown in below code).

Here's the combined solution I've come up with:

public class Socket
{
    public class Item { }

    private class PendingSend
    {
        public ManualResetEventSlim ManualResetEvent { get; set; }
        public bool Success { get; set; }
        public string Message { get; set; }
        public Exception InnerException { get; set; }
    }

    private readonly object sendLock = new object();
    private readonly object receiveLock = new object();
    private readonly ManualResetEventSlim receiveAvailable
        = new ManualResetEventSlim(false);
    private readonly SemaphoreSlim receiveSemaphore 
        = new SemaphoreSlim(1, 1);

    private readonly ConcurrentQueue<Item> sendQueue
        = new ConcurrentQueue<Item>();
    private readonly ConcurrentQueue<PendingSend> pendingSendQueue
        = new ConcurrentQueue<PendingSend>();
    private readonly ConcurrentQueue<Item> receiveQueue
        = new ConcurrentQueue<Item>();

    // Called from any client thread.
    public void Send(Item item, CancellationToken token)
    {
        // initialize handle to wait for.
        using (var handle = new ManualResetEventSlim(false))
        {
            var pendingSend = new PendingSend
            {
                ManualResetEvent = handle
            };

            // Make sure the item and pendingSend are put in the same order.
            lock (this.sendLock)
            {
                this.sendQueue.Enqueue(item);
                this.pendingSendQueue.Enqueue(pendingSend);
            }

            // Wait for the just created send handle to notify.
            // May throw operation cancelled, in which case the message is
            // still enqueued... Maybe fix that later.
            handle.Wait(token);

            if (!pendingSend.Success)
            {
                // Now we actually have information why the send 
                // failed. Pretty cool.
                throw new CommunicationException(
                    pendingSend.Message, 
                    pendingSend.InnerException);
            }
        }
    }

    // Called by internal worker thread.
    internal Item DequeueForSend()
    {
        this.sendQueue.TryDequeue(out Item result);

        // May return null, that's fine
        return result;
    }

    // Called by internal worker thread, in the same order items are dequeued.
    internal void SendNotification(
        bool success,
        string message,
        Exception inner)
    {
        if (!this.pendingSendQueue.TryDequeue(out PendingSend result))
        {
            // TODO: Notify a horrible bug has occurred.
        }

        result.Success = success;
        result.Message = message;
        result.InnerException = inner;

        // Releases that waithandle in the Send() method.
        // The 'PendingSend' instance now contains information about the send.
        result.ManualResetEvent.Set();
    }

    // Called by any client thread.
    public Item Receive(CancellationToken token)
    {
        // This makes sure clients fall through one by one.
        this.receiveSemaphore.Wait(token);

        try
        {
            // This makes sure a message is available.
            this.receiveAvailable.Wait(token);

            if (!this.receiveQueue.TryDequeue(out Item result))
            {
                // TODO: Log a horrible bug has occurred.
            }

            // Make sure the count check and the reset happen in a single go.
            lock (this.receiveLock)
            {
                if (this.receiveQueue.Count == 0)
                {
                    this.receiveAvailable.Reset();
                }
            }

            return result;
        }
        finally
        {
            // make space for the next receive
            this.receiveSemaphore.Release();
        }
    }

    // Called by internal worker thread.
    internal void EnqueueReceived(Item item)
    {
        this.receiveQueue.Enqueue(item);

        // Make sure the set and reset don't intertwine
        lock (this.receiveLock)
        {
            this.receiveAvailable.Set();
        }
    }
}
Jesse de Wit
  • 3,867
  • 1
  • 20
  • 41