I have been trying to implement a multiple producer and multiple consumer model using multiprocessing in python. Producers Scrape data from the web and Consumers process the data. At first I just implemented two function producers and consumer with particular functionality and used Queue to communicate between them but couldn't figure out how to handle the completion event. Then I implemented the model using semaphore -
def producer(RESP_q, URL_q, SEM):
with SEM:
while True:
url = URL_q.get()
if url == "END":
break
RESP = produce_txns(url)
RESP_q.put(RESP)
def consumer(RESP_q, SEM, NP):
while SEM.get_value() < NP or not RESP_q.empty():
resp = RESP_q.get()
for txn in resp:
_txn = E_Transaction(txn)
print(_txn)
RESP_q.task_done()
class Manager:
def __init__(self):
self.URL_q = Queue()
self.RESP_q = JoinableQueue()
self.max_processes = cpu_count()
self.SEM = Semaphore(self.max_processes // 2)
def start(self):
self.worker = []
for i in range(0, self.max_processes, 2):
self.worker.append(Process(target=producer, args=(self.RESP_q, self.URL_q, self.SEM)))
self.worker.append(Process(target=consumer, args=(self.RESP_q, self.SEM, self.max_processes // 2)))
url_server(self.URL_q, self.max_processes // 2)
#Consider URL_q holds -> [*data, *["END"]*(self.max_processes // 2)]
for worker in self.worker:
worker.start()
self.stop()
def stop(self):
for worker in self.worker:
worker.join()
self.RESP_q.join()
self.RESP_q.close()
self.URL_q.close()
Manager().start()
This implementation fails when (In Consumer) RESP_q is empty and SEM is close to max_process and when the interpreter satisfies the while condition, SEM will have the same value as max_process and no producers will be left and program gets blocked at get method. I am not able solve this problem.
Edit 1.
@Louis Lac's Implementation is also correct. I corrected my code to remove the deadlock using try-except block.
def consumer(RESP_q, SEM, NP):
while SEM.get_value() < NP or not RESP_q.empty():
try:
resp = RESP_q.get(timeout=0.5)
except Exception:
continue