1

I need to run a series of independent subprocess calls via multiprocessing. I know in advance that some of these jobs will run orders of magnitudes slower than others. In order to make efficient use of processor time, I'd thought it would make sense to queue the slow jobs first. That way I was hoping to avoid the odd case where one of the long running jobs gets scheduled last and the remaining processors are idle. In this particular case, it doesn't matter whether the actual results are yielded in order or not (because the individual processes create files which are only used once all processes completed). The thing that is important though is that the slow jobs get scheduled first/preferentially. My first question is, how do I achieve this?

Even after reading the multiprocessing documentation and this excellent post explaining the differences between map(), map_async() etc. I'm still a bit confused. My second question therefore is: Should I use Pool.map() instead of Pool.map_async() in this case, or even something else?

I tried map() and map_async() but at first I didn't observe the expected behavior for either. The following example runs processes, which don't do much except creating a file and sleeping.

import subprocess
import multiprocessing
import time
import os
import glob
import sys

NUM_PROCS = 2

def work(cmd):
    return subprocess.call(cmd, shell=True)

def generate_cmds(n=10):
    for i in xrange(n):
        yield "sleep 2 && touch %d.log" % (i)

def main():
    assert len(sys.argv)==2, ("missing arg: async=[0|1]")
    async = sys.argv[1]=='1'

    results = []
    pool = multiprocessing.Pool(processes=NUM_PROCS)
    if async:
        print "map_async()"
        p = pool.map_async(work, generate_cmds(), callback=results.extend) 
        p.wait()# or p.close() and p.join()
    else:
        print "map()"
        results = pool.map(work, generate_cmds())

    print "results: %s" % (results)
    for f in glob.glob("[0-9]*.log"):
        print f, time.strftime('%H:%M:%S', 
                               time.localtime(os.path.getmtime(f)))

if __name__ == "__main__":
    main()

I assumed the timestamps of the created files should match the command order in case of map() since all processes roughly take the same amount of time to complete, but they don't (if I don't play with chunksize). It seems that every other command ran simultaneously:

$ python subp.py 0
map()
results: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
0.log 14:36:57
1.log 14:36:59
2.log 14:36:57
3.log 14:36:59
4.log 14:37:01
5.log 14:37:03
6.log 14:37:01
7.log 14:37:03
8.log 14:37:05
9.log 14:37:07

I know this can be fixed by using a chunksize of 1, but I don't understand why (could someone explain?). My final question is: does this mean I should use map_async(chunksize=1) for my setting?

Thanks, Andreas

PS: I'm using Python 2.7 in case it matters.

Community
  • 1
  • 1
Andreas
  • 716
  • 4
  • 14
  • 1
    could you please read this http://stackoverflow.com/questions/17903355/multiprocessing-pool-map-call-functions-in-certain-order, you will get some idea about the chunk size – James Sapam Feb 01 '14 at 07:37
  • Thanks. That explains the behavior of the example code. – Andreas Feb 01 '14 at 07:46
  • What is the question now as the chunk size was explained already? – Janne Karila Feb 01 '14 at 10:28
  • If you want a sequential order why are you using `multiprocessing`? If you want to only make sequential some piece of computation then you *cannot* rely on timings. If you want any kind of synchronization you must use some [synchronization primitive](http://docs.python.org/3.3/library/multiprocessing.html#synchronization-primitives) such as a [`Barrier`](http://docs.python.org/3.3/library/multiprocessing.html#multiprocessing.Barrier). – Bakuriu Feb 01 '14 at 10:37
  • Hi Janne, I understand what's going on now after reading the chunksize link provided by yopy and am now able to answer all my questions. If yopy added it as answer, rather than a comment, I would have marked it as correct. – Andreas Feb 01 '14 at 13:04

1 Answers1

0

To answer my own question:

In my case it didn't matter whether I use map() or map_async(), because the order of results is not important. The queuing order / order of execution was however and setting chunksize to 1 is therefore important (the link yopy provided gave me the final clue). Each processor will run chunksize iterations and chunksize is determined automatically. This can have the unwanted side effect that the slow jobs (queued first) get assigned to one (or a small number of) processor(s) instead of being equally distributed amongst several processors.

Andreas

Andreas
  • 716
  • 4
  • 14