0

I am trying to utilize python multiprocessing pool to take values from a shared queue and consume in worker processes.

import multiprocessing

def myTask(queue):
   while not queue.empty():     
       value = queue.get()
       print "Process {} Popped {} from the shared Queue".format(multiprocessing.current_process().pid, value)
       queue.task_done()

def main():
   m = multiprocessing.Manager()
   sharedQueue = m.Queue()
   sharedQueue.put(2)
   sharedQueue.put(3)
   sharedQueue.put(4)
   sharedQueue.put(5)
   sharedQueue.put(6)
   pool = multiprocessing.Pool(processes=3)
   pool.apply_async(myTask, args=(sharedQueue,))
   pool.close()
   pool.join()
if __name__ == '__main__':
    main()

From the output I am getting, I see that only a single process was started that took all values out from queue. How can I spawn multiple processes in parallel that keep on getting values from queue. I do have a max limit on number of processes that may be greater than the queue size. Kindly guide me in this regard. Thanks.

PS: This is just an example. The actual task does migration of data from one form to another, and does some heavy lifting operations on data.

Update: I did following modifications, and they seem to work, except for the fact that pool.join() blocks main process from exiting even when all child processes exit.

pool = multiprocessing.Pool(processes=4)
while not sharedQueue.empty():
   pool.apply_async(myTask, args=(sharedQueue,))
pool.close()
#pool.join()

def myTask(queue):
   value = queue.get()
   print "Process {} Popped {} from the shared   Queue".format(multiprocessing.current_process().pid, value)
   queue.task_done()
Dania
  • 1,007
  • 1
  • 14
  • 21
  • Your maximum number of processes is dictated by the number of theoretical CPU cores, not the size of the list btw – roganjosh Sep 23 '18 at 19:24
  • 1
    Imagine you want to write the word "banana", but you want to do it really fast, so you hire 3 people to divide the work of writing the letters between them. It's not going to go any faster than doing it yourself; there's too little work and too much contention. One person's probably going to write all 6 letters, and if they don't, it's probably going to take even longer than if they did. – user2357112 Sep 23 '18 at 19:27
  • 1
    If you want work to be done in parallel you need to call apply_async multiple times. Each call to apply_async will submit a task that will be run by one (and only one) process in the pool. – myrtlecat Sep 23 '18 at 20:15
  • 1
    (Also, checking `queue.empty()` before calling `queue.get()` is fundamentally unsafe, and `queue.empty()` exists primarily for debugging purposes.) – user2357112 Sep 23 '18 at 21:47
  • 1
    To your update: Join blocks because your child processes don't exit. `queue.empty` can change it's value to True in between the `while not sharedQueue.empty()` in the parent and the `queue.get()` in the child, resulting in children waiting for something to be put on the queue again, something that won't happen. Instead just assign tasks as often as many inputs you have with a for-loop: `for _ in range(2, 7): pool.apply_async(my_task, args=(shared_queue,))` – Darkonaut Sep 23 '18 at 22:02
  • Possible duplicate of [How to use multiprocessing queue in Python?](https://stackoverflow.com/questions/11515944/how-to-use-multiprocessing-queue-in-python) – stovfl Sep 24 '18 at 08:25
  • Thanks @Darkonaut for explaining that nicely ! That was helpful. – Dania Sep 24 '18 at 09:30

0 Answers0