3

I am using the multiprocessing python module. I have about 20-25 tasks to run simultaneously. Each task will create a pandas.DataFrame object of ~20k rows. Problem is, all tasks execute well, but when it comes to "joining" the processes, it just stops. I've tried with "small" DataFrames and it works very well. To illustrate my point, I created the code below.

import pandas
import multiprocessing as mp

def task(arg, queue):
    DF = pandas.DataFrame({"hello":range(10)}) # try range(1000) or range(10000)
    queue.put(DF)
    print("DF %d stored" %arg)

listArgs = range(20)
queue = mp.Queue()
processes = [mp.Process(target=task,args=(arg,queue)) for arg in listArgs]

for p in processes:
    p.start()

for i,p in enumerate(processes):
    print("joining %d" %i)
    p.join()

results = [queue.get() for p in processes]

EDIT:

With DF = pandas.DataFrame({"hello":range(10)}) I have everything correct: "DF 0 stored" up to "DF 19 stored", same with "joining 0" to "joining 19".

However with DF = pandas.DataFrame({"hello":range(1000)}) the issue arises: while it is storing the DF, the joining step stops after "joining 3".

Thanks for the useful tips :)

Felix
  • 33
  • 1
  • 8
  • So what does the output look like? Does it say "DF 0 stored" through "DF 19 stored" (in arbitrary order) with a "joining 0" mixed in somewhere? Or something different? (If the problem also happens with only, say, 3 processes, consider doing it that way, so you can just paste the whole output without overwhelming anyone…) – abarnert Apr 22 '15 at 22:32
  • Have you had a look at you system recourse utilization while running the code? If it is a memory issue look at this http://stackoverflow.com/questions/8956832/python-out-of-memory-on-large-csv-file-numpy – kpie Apr 22 '15 at 22:34
  • Also, I don't know whether Pandas can make use of NumPy's multiprocessing shared arrays or not (google numpy-sharedmem), but if it can, that would probably be a lot more efficient than pickling 20 huge frames and passing them over a pipe (which is what `Queue` does under the covers—and there may be a bug in Pandas' pickler or something that's causing your problem…). – abarnert Apr 22 '15 at 22:36
  • Also, have you verified that if you pull out Pandas (maybe just pass a dict of the same size, instead of a DataFrame made from that dict) the problem goes away? Especially if you're using Python 3.2 or earlier, where multiprocessing had some bugs that have since been fixed… – abarnert Apr 22 '15 at 22:39
  • One last thing: what you're doing here could be replaced by just a couple lines of code with a pool (e.g., http://pastebin.com/XNSTSX1E). If you try that, does it work? If so, is there a reason you can't do it that way in your real code? – abarnert Apr 22 '15 at 22:45
  • If it always stops after 3, that's interesting. Or at least it could be, if either (a) a larger range stops at a smaller number, or (b) you have exactly 4 cores. Is either of those true? – abarnert Apr 22 '15 at 22:47
  • @abarnert It seems it is ony linked to `pandas.DataFrame` objects since replacing by `DF=range(1000)` works as well as `DF={"hello":range(1000)}`. (b) I have exactly 4 cores. I use Python 2.7. – Felix Apr 22 '15 at 22:51
  • If it's (a), pickling this frame should take right around 16K, and 4 of those may just barely fit into 64K, and when that number shows up in debugging, it's usually not irrelevant… Although instead of just me guessing how much it should take, can you `print(len(multiprocessing.reduction.ForkingPickler.dumps(DF)))`? – abarnert Apr 22 '15 at 22:51
  • In addition to the above questions: exactly which versions of Python, NumPy, and Pandas do you have, and how did you install them? – abarnert Apr 22 '15 at 22:52
  • Hold on… `DF=range(1000)` or `DF={"hello":range(1000)}` is only going to be a few bytes, because it'll pickle the `range` object, not the 1000 separate numbers. What if you use `DF={"hello": list(range(1000))}`? Does that work, or have the same problem as the DataFrame? – abarnert Apr 22 '15 at 22:59
  • No problem with `DF={"hello": list(range(1000))}`. I just ran `pip install pandas --upgrade` and received tons of warning messages, seems weird. By the way the "joining 3" did not always stop at the "3"rd joining step, I had "2" and "4" after rerunning it. – Felix Apr 22 '15 at 23:01
  • I think I've found the problem. See my answer. – abarnert Apr 22 '15 at 23:04
  • 1
    @abarnert - he's using python 2.7, so `range(1000)` will be a list of 1000 ints, likely enough to fill the underlying pipes buffer. – mata Apr 22 '15 at 23:06
  • @mata: I didn't see the 2.7 in the middle of another comment; I assumed from his 3.x-style `print` calls that he was using 3.x… Anyway, I can reproduce the problem with both 2.7 and 3.4 using a DataFrame of this size, and also with a `list(range(6075))` on 2.7 and `list(range(6077))` on 3.4, but not 1000… – abarnert Apr 22 '15 at 23:13

1 Answers1

13

This problem is explained in the docs, under Pipes and Queues:

Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

Using a manager would work, but there are a lot of easier ways to solve this:

  1. Read the data off the queue first, then join the processes, instead of the other way around.
  2. Manage the Queue manually (e.g., using a JoinableQueue and task_done).
  3. Just use Pool.map instead of reinventing the wheel. (Yes, much of what Pool does isn't necessary for your use case—but it also isn't going to get in the way, and the nice thing is, you already know it works.)

I won't show the implementation for #1 because it's so trivial, or for #2 because it's such a pain, but for #3:

def task(arg):
    DF = pandas.DataFrame({"hello":range(1000)}) # try range(1000) or range(10000)
    return DF

with mp.Pool(processes=20) as p:
    results = p.map(task, range(20), chunksize=1)

(In 2.7, Pool may not work in a with statement; you can install the port of the later version of multiprocessing back to 2.7 off PyPI, or you can just manually create the pool, then close it in a try/finally, just you would handle a file if it didn't work in a with statement...)


You may ask yourself, why exactly does it fail at this point, but work with smaller numbers—even just a little bit smaller?

A pickle of that DataFrame is just over 16K. (The list by itself is a little smaller, but if you try it with 10000 instead of 1000 you should see the same thing without Pandas.)

So, the first child writes 16K, then blocks until there's room to write the last few hundred bytes. But you're not pulling anything off the pipe (by calling queue.get) until after the join, and you can't join until they exit, which they can't do until you unblock the pipe, so it's a classic deadlock. There's enough room for the first 4 to get through, but no room for 5. Because you have 4 cores, most of the time, the first 4 that get through will be the first 4. But occasionally #4 will beat #3 or something, and then you'll fail to join #3. That would happen more often with an 8-core machine.

Community
  • 1
  • 1
abarnert
  • 354,177
  • 51
  • 601
  • 671