13

I have a 256x256x256 Numpy array, in which each element is a matrix. I need to do some calculations on each of these matrices, and I want to use the multiprocessing module to speed things up.

The results of these calculations must be stored in a 256x256x256 array like the original one, so that the result of the matrix at element [i,j,k] in the original array must be put in the [i,j,k] element of the new array.

To do this, I want to make a list which could be written in a pseudo-ish way as [array[i,j,k], (i, j, k)] and pass it to a function to be "multiprocessed". Assuming that matrices is a list of all the matrices extracted from the original array and myfunc is the function doing the calculations, the code would look somewhat like this:

import multiprocessing
import numpy as np
from itertools import izip

def myfunc(finput):
    # Do some calculations...
    ...

    # ... and return the result and the index:
    return (result, finput[1])

# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)

# Make function input from the matrices and the indices:
finput = izip(matrices, inds)

pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))

However, it seems like map_async is actually creating this huge finput-list first: My CPU's aren't doing much, but the memory and swap get completely consumed in a matter of seconds, which is obviously not what I want.

Is there a way to pass this huge list to a multiprocessing function without the need to explicitly create it first? Or do you know another way of solving this problem?

Thanks a bunch! :-)

digitaldingo
  • 519
  • 6
  • 11

3 Answers3

12

All multiprocessing.Pool.map* methods consume iterators fully(demo code) as soon as the function is called. To feed the map function chunks of the iterator one chunk at a time, use grouper_nofill:

def grouper_nofill(n, iterable):
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
    '''
    it=iter(iterable)
    def take():
        while 1: yield list(itertools.islice(it,n))
    return iter(take().next,[])

chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
    async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)

PS. pool.map_async's chunksize parameter does something different: It breaks the iterable into chunks, then gives each chunk to a worker process which calls map(func,chunk). This can give the worker process more data to chew on if func(item) finishes too quickly, but it does not help in your situation since the iterator still gets consumed fully immediately after the map_async call is issued.

Community
  • 1
  • 1
unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • Thank you very much! Your solution does indeed seem to work! For reference, I had to use pool.map_async(myfunc, finput).get(999999), but it works! However, it still uses a lot of memory (of course depending on the exact chunksize), and python doesn't seem to be garbage collecting during the run. Any ideas why that might be? – digitaldingo Sep 05 '11 at 19:03
  • @digitaldingo: Hm, nothing comes to mind. It would be ideal if you can whittle down your code to a [SSCCE](http://sscce.org/) and post it here. – unutbu Sep 05 '11 at 19:29
2

I ran into this problem as well. instead of this:

res = p.map(func, combinations(arr, select_n))

do

res = p.imap(func, combinations(arr, select_n))

imap doesn't consume it!

MutantTurkey
  • 536
  • 5
  • 14
0

Pool.map_async() needs to know the length of the iterable to dispatch the work to multiple workers. Since izip has no __len__, it converts the iterable into a list first, causing the huge memory usage you are experiencing.

You could try to sidestep this by creating your own izip-style iterator with __len__.

Ferdinand Beyer
  • 64,979
  • 15
  • 154
  • 145
  • why does it need to know that? why can't it simply feed all the idle workers and the wait? – andrew cooke Sep 05 '11 at 11:02
  • @andrew - The first lines in `map_async()` (`multiprocessing/pool.py`) are actually `if not hasattr(iterable, '__len__'): iterable = list(iterable)`. It needs to know the length to create a sufficiently large output list as the completion order of the workers is unknown. – Ferdinand Beyer Sep 05 '11 at 11:06
  • hmmm. it could construct that dynamically, couldn't it? i'm just thinking this might be raised as an issue. it seems like a valid request. – andrew cooke Sep 05 '11 at 11:11
  • Yes it could do without `__len__` but it would be pretty complicated. If result #321 is ready before #23, where should it be stored? If the length is known, this gets way easier. – Ferdinand Beyer Sep 05 '11 at 11:53
  • This is indeed interesting... `Pool.map_async()` might not know the length, but I do (256^3)---would it be possible to explicitly tell it the length? If not, maybe it should... – digitaldingo Sep 05 '11 at 19:09