This problem is explained in the docs, under Pipes and Queues:
Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread
), then that process will not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
Note that a queue created using a manager does not have this issue. See Programming guidelines.
Using a manager would work, but there are a lot of easier ways to solve this:
- Read the data off the queue first, then join the processes, instead of the other way around.
- Manage the
Queue
manually (e.g., using a JoinableQueue
and task_done
).
- Just use
Pool.map
instead of reinventing the wheel. (Yes, much of what Pool
does isn't necessary for your use case—but it also isn't going to get in the way, and the nice thing is, you already know it works.)
I won't show the implementation for #1 because it's so trivial, or for #2 because it's such a pain, but for #3:
def task(arg):
DF = pandas.DataFrame({"hello":range(1000)}) # try range(1000) or range(10000)
return DF
with mp.Pool(processes=20) as p:
results = p.map(task, range(20), chunksize=1)
(In 2.7, Pool
may not work in a with
statement; you can install the port of the later version of multiprocessing
back to 2.7 off PyPI, or you can just manually create the pool, then close
it in a try
/finally
, just you would handle a file if it didn't work in a with
statement...)
You may ask yourself, why exactly does it fail at this point, but work with smaller numbers—even just a little bit smaller?
A pickle of that DataFrame is just over 16K. (The list by itself is a little smaller, but if you try it with 10000 instead of 1000 you should see the same thing without Pandas.)
So, the first child writes 16K, then blocks until there's room to write the last few hundred bytes. But you're not pulling anything off the pipe (by calling queue.get
) until after the join
, and you can't join
until they exit, which they can't do until you unblock the pipe, so it's a classic deadlock. There's enough room for the first 4 to get through, but no room for 5. Because you have 4 cores, most of the time, the first 4 that get through will be the first 4. But occasionally #4 will beat #3 or something, and then you'll fail to join #3. That would happen more often with an 8-core machine.