4

I am using the multiprocessing python library to spawn 4 Process() objects to parallelize a cpu intensive task. The task (inspiration and code from this great article) is to compute the prime factors for every integer in a list.

main.py:

import random
import multiprocessing
import sys

num_inputs  = 4000
num_procs   = 4
proc_inputs = num_inputs/num_procs
input_list  = [int(1000*random.random()) for i in xrange(num_inputs)]

output_queue = multiprocessing.Queue()
procs        = []
for p_i in xrange(num_procs):
  print "Process [%d]"%p_i
  proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
  print " - num inputs: [%d]"%len(proc_list)

  # Using target=worker1 HANGS on join
  p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
  # Using target=worker2 RETURNS with success
  #p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue))

  procs.append(p)
  p.start()

for p in jobs:
  print "joining ", p, output_queue.qsize(), output_queue.full()
  p.join()
  print "joined  ", p, output_queue.qsize(), output_queue.full()

print "Processing complete."
ret_vals = []
while output_queue.empty() == False:
    ret_vals.append(output_queue.get())
print len(ret_vals)
print sys.getsizeof(ret_vals)

Observation:

  • If the target for each process is the function worker1, for an input list larger than 4000 elements the main thread gets stuck on .join(), waiting for the spawned processes to terminate and never returns.
  • If the target for each process is the function worker2, for the same input list the code works just fine and the main thread returns.

This is very confusing to me, as the only difference between worker1 and worker2 (see below) is that the former inserts individual lists in the Queue whereas the latter inserts a single list of lists for each process.

Why is there deadlock using worker1 and not using worker2 target? Shouldn't both (or neither) go beyond the Multiprocessing Queue maxsize limit is 32767?


worker1 vs worker2:

def worker1(proc_num, proc_list, output_queue):
    '''worker function which deadlocks'''  
    for num in proc_list:
        output_queue.put(factorize_naive(num))

def worker2(proc_num, proc_list, output_queue):
    '''worker function that works'''
    workers_stuff = []

    for num in proc_list:
        workers_stuff.append(factorize_naive(num))
    output_queue.put(workers_stuff)

There are a lot of similar questions on SO, but I believe the core of this questions is clearly distinct from all of them.

Related Links:

Matteo
  • 7,924
  • 24
  • 84
  • 129

1 Answers1

8

The docs warn about this:

Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

While a Queue appears to be unbounded, under the covers queued items are buffered in memory to avoid overloading inter-process pipes. A process cannot end normally before those memory buffers are flushed. Your worker1() puts a lot more items on the queue than your worker2(), and that's all there is to it. Note that the number of items that can queued before the implementation resorts to buffering in memory isn't defined: it can vary across OS and Python release.

As the docs suggest, the normal way to avoid this is to .get() all the items off the queue before you attempt to .join() the processes. As you've discovered, whether it's necessary to do so depends in an undefined way on how many items have been put on the queue by each worker process.

Community
  • 1
  • 1
Tim Peters
  • 67,464
  • 13
  • 126
  • 132
  • Thanks for the very thorough reply. Just to make sure I understand correctly, you are saying whether a `Queue` gets buffered to memory or not depends on the number of items and not on the overall size of the stored items? If so, do you have an understanding on why has this design choice been made? Thanks a lot again! – Matteo Aug 29 '17 at 21:54
  • 1
    I'd answer, except because "undefined" is the truth I don't want to spend oodles of time digging into every implementation in current use. But "the answer' doesn't matter: _regardless_ of the answer, the solution is to consume the queue before you try to join the processes that populated it. Any answer beyond that is simply reverse-engineering the implementation code in the specific version of Python you're using at the time. – Tim Peters Aug 29 '17 at 21:57
  • Since Queue is a kind of pipe, to successfully join all the child processes, one should **clear** the Queue, right? Essentially, one should clear (flush) the pipe, there are data in the pipe, subprocesses will get stuck waiting for to read the data. – GoingMyWay Jan 17 '20 at 04:24
  • Hi Bro, your answer helped me with my `multiprocessing.Queue` task. – GoingMyWay Jan 17 '20 at 05:08