13

I am trying to split for loop i.e.

N = 1000000
for i in xrange(N):
    #do something

using multiprocessing.Process and it works well for small values of N. Problem arise when I use bigger values of N. Something strange happens before or during p.join() and program doesn't respond. If I put print i, instead of q.put(i) in the definition of the function f everything works well.

I would appreciate any help. Here is the code.

from multiprocessing import Process, Queue

def f(q,nMin, nMax): # function for multiprocessing
    for i in xrange(nMin,nMax):
        q.put(i)

if __name__ == '__main__':

    nEntries = 1000000

    nCpu = 10
    nEventsPerCpu = nEntries/nCpu
    processes = []

    q = Queue()

    for i in xrange(nCpu):
        processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) )

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    print q.qsize()
Puibo
  • 145
  • 2
  • 2
  • 6

1 Answers1

20

You are trying to grow your queue without bounds, and you are joining up to a subprocess that is waiting for room in the queue, so your main process is stalled waiting for that one to complete, and it never will.

If you pull data out of the queue before the join it will work fine.

One technique you could use is something like this:

while 1:
    running = any(p.is_alive() for p in processes)
    while not queue.empty():
       process_queue_data()
    if not running:
        break

According to the documentation, the p.is_alive() should perform an implicit join, but it also appears to imply that the best practice might be to explicitly perform joins on all the threads after this.

Edit: Although that is pretty clear, it may not be all that performant. How you make it perform better will be highly task and machine specific (and in general, you shouldn't be creating that many processes at a time, anyway, unless some are going to be blocked on I/O).

Besides reducing the number of processes to the number of CPUs, some easy fixes to make it a bit faster (again, depending on circumstances) might look like this:

liveprocs = list(processes)
while liveprocs:
    try:
        while 1:
            process_queue_data(q.get(False))
    except Queue.Empty:
        pass

    time.sleep(0.5)    # Give tasks a chance to put more data in
    if not q.empty():
        continue
    liveprocs = [p for p in liveprocs if p.is_alive()]
Patrick Maupin
  • 8,024
  • 2
  • 23
  • 42
  • I am sending my script to machine which has around 30 CPUs so with 10 processes I am still far from max. Are there any other reasons why should I reduce the number of processes? I am doing some data analysis (50 gb of data which is approx 9M events). My idea was to split the data in pieces (e.g. 10) and use multiprocessing. If you have any advice I would appreciate it. – Puibo Jul 29 '15 at 20:39
  • More processes is good up to the number of CPUs -- even past the number of CPUs if processes will sometimes be stalled. The way your intial question was worded, I thought maybe it was a programming homework problem -- didn't realize you had a powerful machine :) Anyway, one metric to consider is how much speedup you are getting vs doing things single-threaded -- if you're getting 10X speedup with 10 processes (unlikely), that's great! Reducing dependencies (waits) between processes is key -- as you've seen, you have to drain the queue. – Patrick Maupin Jul 29 '15 at 22:23