I'm trying to understand how to implement a Queue with bounded size for use with multiple producers and consumers. I have this code:
Attempt 1:
class Q:
def __init__(self, size):
self.buff = [None]*size
self.end = 0
self.start = 0
self.size = size
self.curr_size = 0
self.open = Condition()
self.closed = Condition()
self.closed.acquire()
def put(self, val):
self.open.acquire()
while self.curr_size == self.size:
self.open.wait()
self.buff[self.end] = val
self.end = (self.end+1)%self.size
self.curr_size+=1
self.closed.notify()
self.closed.release()
def get(self):
self.closed.acquire()
while self.curr_size == 0:
self.closed.wait()
val = self.buff[self.start]
self.start = (self.start+1)%self.size
self.curr_size-=1
self.open.notify()
self.open.release()
return val
Could I simplify this further (to use just one condition variable or mutex for example)?
UPDATE A: The code sample above only allows one item to be put on the queue and no more until get is called, wasting the rest of the buffer. Here's an update to the code that tries to fix it:
Attempt 2
class Q:
def __init__(self, size):
self.buff = [None]*size
self.end = 0
self.start = 0
self.size = size
self.curr_size = 0
self.mutex = Lock()
self.open = Condition(self.mutex)
self.closed = Condition(self.mutex)
def put(self, val):
self.mutex.acquire()
while self.curr_size == self.size:
self.open.wait()
self.buff[self.end] = val
self.end = (self.end+1)%self.size
self.curr_size+=1
self.closed.notify()
self.mutex.release()
def get(self):
self.mutex.acquire()
while self.curr_size == 0:
self.closed.wait()
val = self.buff[self.start]
self.start = (self.start+1)%self.size
self.curr_size-=1
self.open.notify()
self.mutex.release()
return val
UPDATE B: Here the producer blocks the consumer and vice versa, is there a way they could be concurrent as in here using semaphores?
Attempt 3:
class Q:
def __init__(self, size):
self.buff = [None]*size
self.end = 1
self.start = 0
self.size = size
self.start_lock = Lock()
self.end_lock = Lock()
self.open = Condition(self.end_lock)
self.closed = Condition(self.start_lock)
self.start_lock.acquire()
def size_fn(self):
return self.end + self.size - self.start if self.end <= self.start else self.end - self.start
def put(self, val):
with self.end_lock:
while size_fn() == self.size:
self.open.wait()
self.buff[self.end-1] = val
self.end = (self.end+1)%self.size
self.closed.notify()
def get(self):
with self.start_lock:
while size_fn() == 0:
self.closed.wait()
val = self.buff[(self.start+1)%self.size]
self.start = (self.start+1)%self.size
self.open.notify()
return val
Here, producers and consumers use different mutex locks, but there could be context switching happening during the function size_fn() or just after it, resulting in unnecessary waits (when it's empty or full). But the overall performance seems improved as producers and consumers can run concurrently.
Update C
There are anomalies in the above code.
So heres another attempt: I first reverse-engineered Semaphores to be built using Condition Variables as follows:
class Semaphore:
def __init__(self, size):
self.size = size
self.curr_size = 0
self.mutex = Lock()
self.cv = Condition(self.mutex)
def acquire(self):
with self.mutex:
while self.curr_size == self.size:
self.cv.wait()
self.curr_size += 1
def release(self):
with self.mutex:
if self.curr_size == 0:
raise Exception("Releasing semaphore more times than acquired!")
self.curr_size-=1
self.cv.notify()
Now I could use this idea two produce the Q implementation (seen here using semaphores) with just Condition variables:
Attempt 4 class Q: def init(self, size): self.buff = [None]*size self.end = 0 self.start = 0 self.size = size
self.start_lock = Lock()
self.end_lock = Lock()
self.open_mutex = Lock()
self.open_num = 0
self.open_cv = Condition(self.open_mutex)
self.closed_mutex = Lock()
self.closed_num = self.size
self.closed_cv = Condition(self.closed_mutex)
def put(self, val):
with self.open_mutex:
while self.open_num == self.size:
self.open_cv.wait()
self.open_num+=1
with self.end_lock:
self.buff[self.end] = val
self.end = (self.end+1)%self.size
with self.closed_mutex:
self.closed_num-=1
self.closed_cv.notify()
def get(self):
with self.closed_mutex:
while self.closed_num == self.size:
self.closed_cv.wait()
self.closed_num+=1
with self.start_lock:
val = self.buff[self.start]
self.start = (self.start+1)%self.size
with self.open_mutex:
self.open_num-=1
self.open_cv.notify()
return val
Let me know if there are issues in the above implementation.