6

I'm using python 2.7.10. I read lots of files, store them into a big list, then try to call multiprocessing and pass the big list to those multiprocesses so that each process can access this big list and do some calculation.

I'm using Pool like this:

def read_match_wrapper(args):
    args2 = args[0] + (args[1],)
    read_match(*args2)

 pool = multiprocessing.Pool(processes=10)
 result=pool.map(read_match_wrapper,itertools.izip(itertools.repeat((ped_list,chr_map,combined_id_to_id,chr)),range(10)))
 pool.close()
 pool.join()

Basically, I'm passing multiple variables to 'read_match' function. In order to use pool.map, I write 'read_match_wrapper' function. I don't need any results back from those processes. I just want them to run and finish.

I can get this whole process work when my data list 'ped_list' is quite small. When I load all the data, like 10G, then all the multiprocesses that it generates show 'S' and seems not working at all..

I don't know if there is a limit of how much data you can access through pool? I really need help on this! Thanks!

odeya
  • 69
  • 6
  • What's the code of `read_match`? – Colonel Thirty Two Jul 02 '15 at 19:54
  • Could you check the `itertools.izip()` section did return a right value? If your data is really 10G, with the `repeat()`, it might get worse and probably consume too many memory. – Jkm Jul 02 '15 at 20:10
  • Yes, I think memory is the issue here. Each time it spawn a process, the memory are copied. And the cluster can't afford that! – odeya Jul 06 '15 at 16:42

2 Answers2

5

From the multiprocessing Programming guidelines:

Avoid shared state

As far as possible one should try to avoid shifting large amounts of data between processes.

What you suffer from is a typical symptom of a full Pipe which does not get drained.

The Python multiprocessing.Pipe used by the Pool has some design flaw. It basically implements a sort of message oriented protocol over an OS pipe which is more like a stream object.

The result is that, if you send a too large object through the Pipe, it will get stuffed. The sender won't be able to add content to it and the receiver won't be able to drain it as it's blocked waiting for the end of the message.

Proof is that your workers are sleeping waiting for that "fat" message which never arrives.

Is ped_list containing the file names or the file contents?

In the second case you'd rather send the file names instead of the content. The workers can retrieve the content themselves with a simple open().

noxdafox
  • 14,439
  • 4
  • 33
  • 45
  • I notice this from other posts, http://stackoverflow.com/questions/14124588/python-multiprocessing-shared-memory. I should turn my ped_list into multiprocessing.Array. But I don't know how to do it since it's a list of lists..multiprocessing.Array seems to take very simple format only – odeya Jul 06 '15 at 16:50
  • 1
    No matter what method you'll use to share the ped_list, it will still be very slow and suffer from deadlocks if not *very* well implemented. The real solution of the problem is still the proposed one. Instead of loading the file content in ped_list, just load the file names and let the child workers do the loading of the file themselved. Another issue exactly the same as yours: http://stackoverflow.com/questions/27253666/python-multiprocessing-pool-got-stuck-after-long-execution/27757177#27757177 – noxdafox Jul 06 '15 at 17:51
  • 1
    You'll notice as well a certain speed up as the Pipe is a real bottleneck in such cases. On Unix systems its size is usually limited to 32Mb. You wanted to stuff 10Gb through it :) – noxdafox Jul 07 '15 at 08:41
0

Instead of working with pool.mapI would rather use queues. You could spawn the desired number of processes and assign a queue for input:

n = 10 #number of processes
tasks = multiprocessing.Queue()

for i in range(n): #spawn processes
    multiprocessing.Process(target = read_match_wrapper, args = tasks)
for element in ped_list:
    tasks.put(element)

In this way, your queue is filled from one side and at the same time emptied from the other. Maybe it is necessary to put something in the queue before the processes are started. There is a chance that they end without doing anything as the queue is empty or raise a Queue.empty-exception.

RaJa
  • 1,471
  • 13
  • 17