14

I want to fill a 2D-numpy array within a for loop and fasten the calculation by using multiprocessing.

import numpy
from multiprocessing import Pool


array_2D = numpy.zeros((20,10))
pool = Pool(processes = 4)

def fill_array(start_val):
    return range(start_val,start_val+10)

list_start_vals = range(40,60)
for line in xrange(20):
    array_2D[line,:] = pool.map(fill_array,list_start_vals)
pool.close()

print array_2D

The effect of executing it is that Python runs 4 subprocesses and occupies 4 CPU cores BUT the execution doesn´t finish and the array is not printed. If I try to write the array to the disk, nothing happens.

Can anyone tell me why?

MoTSCHIGGE
  • 949
  • 5
  • 12
  • 21

3 Answers3

5

The following works. First it is a good idea to protect the main part of your code inside a main block in order to avoid weird side effects. The result of pool.map() is a list containing the evaluations for each value in the iterator list_start_vals, such that you don't have to create array_2D before.

import numpy as np
from multiprocessing import Pool

def fill_array(start_val):
    return list(range(start_val, start_val+10))

if __name__=='__main__':
    pool = Pool(processes=4)
    list_start_vals = range(40, 60)
    array_2D = np.array(pool.map(fill_array, list_start_vals))
    pool.close() # ATTENTION HERE
    print array_2D

perhaps you will have trouble using pool.close(), from the comments of @hpaulj you can just remove this line in case you have problems...

paperjam
  • 8,321
  • 12
  • 53
  • 79
Saullo G. P. Castro
  • 56,802
  • 26
  • 179
  • 234
  • With larger arrays, I get an error `Exception RuntimeError: RuntimeError('cannot join current thread',) in ignored`. `apply_async` does not give this warning. – hpaulj Sep 17 '14 at 20:19
  • Without the `pool.close()` command, I don't get this `Error`. – hpaulj Sep 17 '14 at 20:29
  • @hpaulj thank you for the feedback... I tried producing an array which is `10000 X 10000` with no problem, changing 60 by 10040 and 10 by 10000... – Saullo G. P. Castro Sep 17 '14 at 20:29
  • Maybe it's an issue of machine size and speed. Mine's relatively old. – hpaulj Sep 17 '14 at 20:30
  • On further testing it appears that a `pool.join()` is more important if the mapping is too slow. – hpaulj Sep 17 '14 at 22:00
1

If you still want to use the array fill, you can use pool.apply_async instead of pool.map. Working from Saullo's answer:

import numpy as np
from multiprocessing import Pool

def fill_array(start_val):
    return range(start_val, start_val+10)

if __name__=='__main__':
    pool = Pool(processes=4)
    list_start_vals = range(40, 60)
    array_2D = np.zeros((20,10))
    for line, val in enumerate(list_start_vals):
        result = pool.apply_async(fill_array, [val])
        array_2D[line,:] = result.get()
    pool.close()
    print array_2D

This runs a bit slower than the map. But it does not produce a runtime error like my test of the map version: Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored

hpaulj
  • 221,503
  • 14
  • 230
  • 353
0

The problem is due to running the pool.map in for loop , The result of the map() method is functionally equivalent to the built-in map(), except that individual tasks are run parallel. so in your case the pool.map(fill_array,list_start_vals) will be called 20 times and start running parallel for each iteration of for loop , Below code should work

Code:

#!/usr/bin/python

import numpy
from multiprocessing import Pool

def fill_array(start_val):
    return range(start_val,start_val+10)

if __name__ == "__main__":
    array_2D = numpy.zeros((20,10))
    pool = Pool(processes = 4)    
    list_start_vals = range(40,60)

    # running the pool.map in a for loop is wrong
    #for line in xrange(20):
    #    array_2D[line,:] = pool.map(fill_array,list_start_vals)

    # get the result of pool.map (list of values returned by fill_array)
    # in a pool_result list 
    pool_result = pool.map(fill_array,list_start_vals)

    # the pool is processing its inputs in parallel, close() and join() 
    #can be used to synchronize the main process 
    #with the task processes to ensure proper cleanup.
    pool.close()
    pool.join()

    # Now assign the pool_result to your numpy
    for line,result in enumerate(pool_result):
        array_2D[line,:] = result

    print array_2D
Saullo G. P. Castro
  • 56,802
  • 26
  • 179
  • 234
Ram
  • 1,115
  • 8
  • 20
  • Thanks for your reply. Unfortunalely the effect is the same. Python starts subprocesses and occupies the PC but nothing happens. I´m running the code on an Windows 7 machine (dual core CPU with hyperthreading => virtually a quadcore), Python 2.7.5 32bit and I use SpyderLib as programming interface. – MoTSCHIGGE Sep 17 '14 at 12:34
  • @MoTSCHIGGE i ran the code i posted in windows environment and it seems to be working , I think you are running the code with out the if "__main__"==__name__: , if that's the case the code will run indefinitely in windows , please refer to the Stack Overflow link regarding the importance of if condition in windows http://stackoverflow.com/questions/20222534/python-multiprocessing-on-windows-if-name-main – Ram Sep 17 '14 at 15:04
  • I just tried to run the sample code above including "if __name__ == "__main__": " but nothing happens. I don´t know whats wrong here.. – MoTSCHIGGE Sep 17 '14 at 17:07