3

When you supply a large-enough object into multiprocessing.Queue, the program seems to hang at weird places. Consider this minimal example:

import multiprocessing

def dump_dict(queue, size):
  queue.put({x: x for x in range(size)})
  print("Dump finished")

if __name__ == '__main__':
  SIZE = int(1e5)
  queue = multiprocessing.Queue()
  process = multiprocessing.Process(target=dump_dict, args=(queue, SIZE))
  print("Starting...")
  process.start()
  print("Joining...")
  process.join()
  print("Done")
  print(len(queue.get()))

If the SIZE parameter is small-enough (<= 1e4 at least in my case), the whole program runs smoothly without a problem, but once the SIZE is big-enough, the program hangs at weird places. Now, when searching for explanation, i.e. python multiprocessing - process hangs on join for large queue, I have always seen general answers of "you need to consume from the queue". But what seems weird is that the program actually prints Dump finished i.e. reaching the code line after putting the object into the queue. Furthermore using Queue.put_nowait instead of Queue.put did not make a difference.

Finally if you use Process.join(1) instead of Process.join() the whole process finishes with complete dictionary in the queue (i.e. the print(len(..)) line will print 10000).

Can somebody give me a little bit more insight into this?

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
Drecker
  • 1,215
  • 10
  • 24
  • The code is inserting ONE dictionary of 10000 elements in the queue. Is that what you want or are you trying to insert each dict item instead? – Pynchia Dec 05 '19 at 13:56
  • @Pynchia yes, as I was saying I know that I put single huge object in there (it is "dummy" example). What seems weird to me is the behavior of the code (i.e. I would expect to freeze on `Queue.put` and most certainly not "success" if I put timeout there. I was hoping for some kind of explanation (or link for further reading...) – Drecker Dec 05 '19 at 15:07

2 Answers2

2

You need to queue.get() in the parent before you process.join() to prevent a deadlock. The queue has spawned a feeder-thread with its first queue.put() and the MainThread in your worker-process is joining this feeder-thread before exiting. So the worker-process won't exit before the result is flushed to (OS-pipe-)buffer completely, but your result is too big to fit into the buffer and your parent doesn't read from the queue until the worker has exited, resulting in a deadlock.

You see the output of print("Dump finished") because the actual sending happens from the feeder-thread, queue.put() itself just appends to a collections.deque within the worker-process as an intermediate step.

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • Thank you! I did not realize this mistake, it makes sense now – Drecker Dec 05 '19 at 15:21
  • Well spotted, of course, nobody else was consuming, only producing... upvoted – Pynchia Dec 05 '19 at 15:25
  • i run the code that you given above, it's fine for just one process, but there are 2 or more processes, deadlock happens again. e.g., ```if __name__ == '__main__': SIZE = int(1e5) queue = multiprocessing.Queue() multip = [] for _ in range(2): process = multiprocessing.Process(target=dump_dict, args=(queue, SIZE)) multip.append(process) print("Starting...") process.start() print("Joining...") print(len(queue.get())) for i in range(2): multip[i].join() print("Done")``` – dl wu May 13 '22 at 03:30
  • 1
    @dlwu Do you use `queue.get()` only once although you have multiple producers? I can't tell because the formatting isn't clear. But obviously, you would need the same amount of `queue.get()` calls as you have `queue.put()` calls in total, before you can join. – Darkonaut May 13 '22 at 04:00
  • @Darkonaut ,thanks, you are right, now it works well. – dl wu May 13 '22 at 06:24
1

Facing a similar problem, I solved it using @Darkonaut's answer and the following implementation:

import time
done = 0
while done < n:    # n is the number of objects you expect to get
    if not queue.empty():
        done += 1
        results = queue.get()
        # Do something with the results
    else:
        time.sleep(.5)

Doesn't feel very pythonic, but it worked!