1

I have written some Python code that extracts information from a large database, performs some maximum-likelihood modeling of each item in the database, then stores the results. The code works fine serially and each item takes 1-2 seconds. The problem is, I have a few million items in the database, so a serial run of the code will take weeks. I have access to a cluster that has 16 cpus, and I've written a function that breaks the intial input catalog into "chunks," runs the modeling code on each chunk, then organizes the results. Because of the complexity of the modeling portion of the code and its reliance on 3rd party python software, I've been trying to parallelize this code using multiprocessing so that each chunk of the input catalog is run on a separate cpu. Currently, the processes spawned by my attempt are only being run on a single cpu. Does anyone know how to fix this? I am a self-taught scientific programmer, so I have some experience using Python, though this is the first time I've attempt parallelization. The code is far too long to show here in its entirety, but it essentially does the following:

def split(a, n):
    k, m = len(a) / n, len(a) % n
    return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in xrange(n))

ncpus = mp.cpu_count()
chunks = list(split(catlist, ncpus))
images = [im1, im2, im3, im4, im5, im6, im7, im8, im9, im10, im11, im12]

def process_chunks(chunk):
    database_chunk = database[chunk[:]]
    for i in range(0,len(database_chunk)):
        # perform some analysis
        for j in range(0,len(images)):
            # perform more analysis      
    my_dict = {'out1': out1, 'out2':out2, 'out3': out3}
    return my_dict

pool = mp.Pool(processes=ncpus)
result = [pool.apply(process_chunks, args=(chunks[id],)) for id in range(0,len(chunks))]

for a simple test example of only 13 items from the database, this is what chunks looks like (list of roughly evenly split lists):

chunks = [[0, 1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]

The code runs fine, and the pool.apply command spawns multiple processes, but all processes are run on a single cpu and there is no improvement in runtime over serial (in fact it's a bit slower, I guess due to the overhead to generate each process). Is there a way to force each process to be assigned to a separate cpu so I get some improvement when I apply this to the full database (which has millions of items)? Or perhaps a better way to go about this problem in general?

Thank you so much in advance!

Note: In case it clarifies the situation further, I'm an astronomer and the database is a large source catalog with over a million entries, the images are wide-field mosaics in 12 different bands, and I'm performing maximum-likelihood photometry on each band of each source.

wogsland
  • 9,106
  • 19
  • 57
  • 93

1 Answers1

0

I thought at first you were running into the global interpreter lock, but you've already dodged that by using multiprocessing. Thankfully, that means things are much easier than otherwise. From the docs:

Equivalent of the apply() built-in function. It blocks until the result is ready, so apply_async() is better suited for performing work in parallel.

(emphasis mine)

apply_async() has a slightly different API than apply() but it's a pretty easy modification:

calculations = []

def reaper(result):
    calculations.append(result)

for id in range(0, len(chunks)):
    pool.apply_async(process_chunks, args=(chunks[id],), callback=reaper)
  • Why not `results = pool.map(process_chunks, chunks)`? – Dan D. May 12 '16 at 22:41
  • Not parallel; see https://stackoverflow.com/questions/8533318/python-multiprocessing-pool-when-to-use-apply-apply-async-or-map – R Phillip Castagna May 12 '16 at 22:43
  • That answer doesn't say that. It talks about the order of the results. The example using `pool.apply_async` collects them into a list in the order the tasks complete where as `pool.map` remembers the index in the argument list so that they can be put back at that location in the result list. Well, also it blocks for the all the results to complete but that doesn't mean that the items aren't processed in parallel. – Dan D. May 12 '16 at 22:54
  • And you can get around that last part by using `pool.map_async`. – Dan D. May 12 '16 at 22:56
  • As you already pointed out, `map` **is** blocking per-invocation, though `map_async()` can still be used if you prefer the map API, but the OP was already using `apply()` so I did what I figured was the simplest modification. – R Phillip Castagna May 12 '16 at 22:56
  • The OP's use case also isn't dependent on ordered results. – R Phillip Castagna May 12 '16 at 23:01
  • I should have mentioned in the original post - I did also try apply_async(), but the result was the same as with apply() - only 1 cpu core was being used and the runtime was a bit longer than when run serially. – user6327340 May 12 '16 at 23:19