1

I have been trying to implement a multiple producer and multiple consumer model using multiprocessing in python. Producers Scrape data from the web and Consumers process the data. At first I just implemented two function producers and consumer with particular functionality and used Queue to communicate between them but couldn't figure out how to handle the completion event. Then I implemented the model using semaphore -

def producer(RESP_q, URL_q, SEM):
    with SEM:
        while True:
            url = URL_q.get()

            if url == "END": 
                break

            RESP = produce_txns(url)
            RESP_q.put(RESP)

def consumer(RESP_q, SEM, NP):
    while SEM.get_value() < NP or not RESP_q.empty():
        resp = RESP_q.get()

        for txn in resp:
            _txn = E_Transaction(txn)
            print(_txn)
 
        RESP_q.task_done()


class Manager:
    def __init__(self):
        self.URL_q = Queue()
        self.RESP_q = JoinableQueue()
        self.max_processes = cpu_count()
        self.SEM = Semaphore(self.max_processes // 2)

    def start(self):
        self.worker = []

        for i in range(0, self.max_processes, 2):
            self.worker.append(Process(target=producer, args=(self.RESP_q, self.URL_q, self.SEM)))
            self.worker.append(Process(target=consumer, args=(self.RESP_q, self.SEM, self.max_processes // 2)))
        
        url_server(self.URL_q, self.max_processes // 2)
        #Consider URL_q holds -> [*data, *["END"]*(self.max_processes // 2)]

        for worker in self.worker:
            worker.start()

        self.stop()

    def stop(self):
        for worker in self.worker:
            worker.join()

        self.RESP_q.join()
        self.RESP_q.close()
        self.URL_q.close()
        

Manager().start()

This implementation fails when (In Consumer) RESP_q is empty and SEM is close to max_process and when the interpreter satisfies the while condition, SEM will have the same value as max_process and no producers will be left and program gets blocked at get method. I am not able solve this problem.

Edit 1.

@Louis Lac's Implementation is also correct. I corrected my code to remove the deadlock using try-except block.

def consumer(RESP_q, SEM, NP):
    while SEM.get_value() < NP or not RESP_q.empty():
        try:
           resp = RESP_q.get(timeout=0.5)
        except Exception:
           continue
Rinkeby
  • 13
  • 3
  • Why using a semaphore? Replace the `while True` infinite loop with a `while condition` loop, where `condition` is initially `True` and changed to `False` when processing is done. – Louis Lac Nov 02 '21 at 13:58
  • There are multiple consumers and producers, there is a slight difference between it and single producer and consumers model because not all consumers can know when all the producers are finished. – Rinkeby Nov 02 '21 at 14:24
  • You can create a "registration" activity for the producers. When a producer registers a shared counter is incremented. When the producer completes its last action is to deregister, which decrements the shared counter. Consumers can then complete their work when the shared counter goes from not 0 to 0. – Jim Rogers Nov 02 '21 at 20:01

1 Answers1

1

Here is an example of multiple consumers multiple producers implementation. You can use the daemon flag when instantiating the processes so that they are automatically closed when the program quits. You can then use the JoinableQueue and join them (instead of joining the processes) so that the programs quits when there is no item left to process.

You should use if __main__ == "__main__ to launch the program without causing a recursive execution of that program.

from multiprocessing import Process, JoinableQueue
from time import sleep


def consumer(in_queue: JoinableQueue, out_queue: JoinableQueue):
    while True:
        item = in_queue.get()
        sleep(0.5)
        s = str(item)
        out_queue.put(s)
        in_queue.task_done()

def producer(in_queue: JoinableQueue):
    while True:
        item = in_queue.get()
        sleep(0.5)
        n = int(item)
        print(n)
        in_queue.task_done()

if __name__ == "__main__":
    number_queue = JoinableQueue()
    str_queue = JoinableQueue()

    for _ in range(4):
        Process(target=consumer, args=(number_queue, str_queue), daemon=True).start()
        Process(target=producer, args=(str_queue,), daemon=True).start()

    for i in range(100):
        number_queue.put(i)

    number_queue.join()
    str_queue.join()
Louis Lac
  • 5,298
  • 1
  • 21
  • 36
  • This implementation is really smart. How will there be a recursive execution if i don't use if __name__ == "__main__" ? – Rinkeby Nov 03 '21 at 13:30
  • See this [SO post](https://stackoverflow.com/a/18205006/6324055) for an explanation (actually the answer seems to be valid for Unix systems too). Multiprocessing in Python involves instantiating multiple Python interpreter and executing multiple times the main file, which leads to recursion. – Louis Lac Nov 03 '21 at 13:46
  • If you are using the multiple producers and consumers a lot, you can check `multiprocessing.Pool`. – Louis Lac Nov 03 '21 at 13:48