0

I'm trying to understand how to implement a Queue with bounded size for use with multiple producers and consumers. I have this code:

Attempt 1:

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 0
        self.start = 0
        self.size = size
        self.curr_size = 0
        self.open = Condition()
        self.closed = Condition()
        self.closed.acquire()

    def put(self, val):
        self.open.acquire()
        while self.curr_size == self.size:
            self.open.wait()
        self.buff[self.end] = val
        self.end = (self.end+1)%self.size
        self.curr_size+=1
        self.closed.notify()
        self.closed.release()

    def get(self):
        self.closed.acquire()
        while self.curr_size == 0:
            self.closed.wait()
        val = self.buff[self.start]
        self.start = (self.start+1)%self.size
        self.curr_size-=1
        self.open.notify()
        self.open.release()
        return val

Could I simplify this further (to use just one condition variable or mutex for example)?

UPDATE A: The code sample above only allows one item to be put on the queue and no more until get is called, wasting the rest of the buffer. Here's an update to the code that tries to fix it:

Attempt 2

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 0
        self.start = 0
        self.size = size
        self.curr_size = 0
        self.mutex = Lock()
        self.open = Condition(self.mutex)
        self.closed = Condition(self.mutex)

    def put(self, val):
        self.mutex.acquire()
        while self.curr_size == self.size:
            self.open.wait()
        self.buff[self.end] = val
        self.end = (self.end+1)%self.size
        self.curr_size+=1
        self.closed.notify()
        self.mutex.release()

    def get(self):
        self.mutex.acquire()
        while self.curr_size == 0:
            self.closed.wait()
        val = self.buff[self.start]
        self.start = (self.start+1)%self.size
        self.curr_size-=1
        self.open.notify()
        self.mutex.release()
        return val

UPDATE B: Here the producer blocks the consumer and vice versa, is there a way they could be concurrent as in here using semaphores?

Attempt 3:

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 1
        self.start = 0
        self.size = size
        self.start_lock = Lock()
        self.end_lock = Lock()
        self.open = Condition(self.end_lock)
        self.closed = Condition(self.start_lock)
        self.start_lock.acquire()

    def size_fn(self):
        return self.end + self.size - self.start if self.end <= self.start else self.end - self.start

    def put(self, val):
        with self.end_lock:
            while size_fn() == self.size:
                self.open.wait()
            self.buff[self.end-1] = val
            self.end = (self.end+1)%self.size
            self.closed.notify()

    def get(self):
        with self.start_lock:
            while size_fn() == 0:
                self.closed.wait()
            val = self.buff[(self.start+1)%self.size]
            self.start = (self.start+1)%self.size
            self.open.notify()
        return val

Here, producers and consumers use different mutex locks, but there could be context switching happening during the function size_fn() or just after it, resulting in unnecessary waits (when it's empty or full). But the overall performance seems improved as producers and consumers can run concurrently.

Update C

There are anomalies in the above code.

So heres another attempt: I first reverse-engineered Semaphores to be built using Condition Variables as follows:

class Semaphore:
    def __init__(self, size):
        self.size = size
        self.curr_size = 0
        self.mutex = Lock()
        self.cv = Condition(self.mutex)

    def acquire(self):
        with self.mutex:
            while self.curr_size == self.size:
                self.cv.wait()
            self.curr_size += 1         

    def release(self):
        with self.mutex:
            if self.curr_size == 0:
                raise Exception("Releasing semaphore more times than acquired!")
            self.curr_size-=1
            self.cv.notify()

Now I could use this idea two produce the Q implementation (seen here using semaphores) with just Condition variables:

Attempt 4 class Q: def init(self, size): self.buff = [None]*size self.end = 0 self.start = 0 self.size = size

        self.start_lock = Lock()
        self.end_lock = Lock()

        self.open_mutex = Lock()
        self.open_num = 0
        self.open_cv = Condition(self.open_mutex)

        self.closed_mutex = Lock()
        self.closed_num = self.size
        self.closed_cv = Condition(self.closed_mutex)

    def put(self, val):

            with self.open_mutex:
                while self.open_num == self.size:
                    self.open_cv.wait()
                self.open_num+=1

            with self.end_lock:
                self.buff[self.end] = val
                self.end = (self.end+1)%self.size

            with self.closed_mutex:
                self.closed_num-=1
                self.closed_cv.notify()

    def get(self):
            with self.closed_mutex:
                while self.closed_num == self.size:
                    self.closed_cv.wait()
                self.closed_num+=1

            with self.start_lock:
                val = self.buff[self.start]
                self.start = (self.start+1)%self.size

            with self.open_mutex:
                self.open_num-=1
                self.open_cv.notify()
        return val

Let me know if there are issues in the above implementation.

np20
  • 1,935
  • 3
  • 16
  • 24

1 Answers1

0

You may want to refer this: Bounded buffer example using condition variable, and another example using semaphore: https://codeistry.wordpress.com/2018/05/12/thread-safe-buffer-queue-python-code/

Multiple producers consumers using the bounded buffer: https://codeistry.wordpress.com/2018/05/13/ordered-producer-consumer-python-code/

Ashish Khurange
  • 903
  • 7
  • 17