I've got a class, which should be thread safe. I preferably want to manage thread safety with a single synchronization object in order to avoid complicated mindbreakers, since all methods alter object state variables. So I wrap method bodies with a lock statement on that object.
There are scenario's where the lock needs to be released for a while in order to allow another thread to update the state. So far so good, just use Monitor.Wait()
and Monitor.Pulse()
.
However, I would like to 'Pulse' with a condition. In the code below, I want to send a 'Pulse' only to a thread waiting in the 'Send()' method. And similarly, send a 'Pulse' only to a thread waiting in the 'Receive()' method.
So summarizing:
- I prefer to use a single synchronization object to lock on, because each of the four methods alters object state variables.
- The Wait() should release the lock, but wait for specific pulses. After being notified, the lock should be re-acquired.
- The Pulse() should only notify one waiting thread, either the send or receive waiter.
- Preferably be able to use a
CancellationToken
as well, to cancel the wait.
I've tried many things, including Monitor, Semaphore and WaitHandle combinations, queues with WaitHandles and more creative options. Also, I've been playing with multiple synchronization objects. But in each scenario I only get parts of the functionality to work.
Below code is the closest I've gotten. The TODO comments show what is wrong with the code.
public class Socket
{
public class Item { }
private object sync = new object();
private ManualResetEvent receiveAvailable = new ManualResetEvent(false);
private Queue<Item> receiveQueue = new Queue<Item>();
// used by client, from any thread
public void Send(Item item, CancellationToken token)
{
lock (this.sync)
{
// sends the message somewhere and should await confirmation.
// note that the confirmation order matters.
// TODO: Should only continue on notification from 'NotifySent()', and respect the cancellation token
Monitor.Wait(this.sync);
}
}
// used by client, from any thread
public Item Receive(CancellationToken token)
{
lock (this.sync)
{
if (!this.receiveAvailable.WaitOne(0))
{
// TODO: Should only be notified by 'EnqueueReceived()' method, and respect the cancellation token.
Monitor.Wait(this.sync);
}
var item = this.receiveQueue.Dequeue();
if (this.receiveQueue.Count == 0)
{
this.receiveAvailable.Reset();
}
return item;
}
}
// used by internal worker thread
internal void NotifySent()
{
lock (this.sync)
{
// Should only notify the Send() method.
Monitor.Pulse(this.sync);
}
}
// used by internal worker thread
internal void EnqueueReceived(Item item)
{
lock (this.sync)
{
this.receiveQueue.Enqueue(item);
this.receiveAvailable.Set();
// TODO: Should only notify the 'Receive()' method.
Monitor.Pulse(this.sync);
}
}
}
SIDENOTE:
In python, my requirement is possible using a threading.Condition
(ignoring the CancellationToken
). Perhaps a similar construct in available in C#?
class Socket(object):
def __init__(self):
self.sync = threading.RLock()
self.receive_queue = collections.deque()
self.send_ready = threading.Condition(self.sync)
self.receive_ready = threading.Condition(self.sync)
def send(self, item):
with self.send_ready:
// send the message
self.send_ready.wait()
def receive(self):
with self.receive_ready:
try:
return self.receive_queue.popleft()
except IndexError:
self.receive_ready.wait()
return self.receive_queue.popleft()
def notify_sent(self):
with self.sync:
self.send_ready.notify()
def enqueue_received(self, item):
with self.sync:
self.receive_queue.append(item)
self.receive_ready.notify()