0

New to multiprocessing and I can't get my head around the documentation. How can I run a function of this form across multiple processes? Here is my attempt. I want to run a convolution (of start with other) through multiple processes for speed.

from multiprocessing import Process, Value, Array 
import numpy as np
import scipy

A = np.ones((10,10,4))
B = np.random.rand(10,10)

def loop(i):
     C[:,:,i] = scipy.ndimage.convolve(A[:,:,i],B)

if __name__ == '__main__':
     C = Array('i',np.zeros((10,10,4)))
     arr    = Array('i',range(4))
     for i in arr:
        p = Process(target = loop, args=i)
        p.start()
        p.join()

Current error:

Traceback (most recent call last):

File "", line 11, in y_test = Array('i',np.zeros((10,10,4)))

File "/usr/lib/python2.7/multiprocessing/init.py", line 260, in Array return Array(typecode_or_type, size_or_initializer, **kwds)

File "/usr/lib/python2.7/multiprocessing/sharedctypes.py", line 115, in Array obj = RawArray(typecode_or_type, size_or_initializer)

File "/usr/lib/python2.7/multiprocessing/sharedctypes.py", line 89, in RawArray result.init(*size_or_initializer)

TypeError: only length-1 arrays can be converted to Python scalars

y_test would be the ouput

William Baker Morrison
  • 1,642
  • 4
  • 21
  • 33

2 Answers2

2

The offending line is

y_test = Array('y_test',np.zeros((10,10,4)))

It is plain wrong. Documentation says for multiprocessing.Array

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
...
typecode_or_type determines the type of the elements of the returned array: it is either a ctypes type or a one character typecode of the kind used by the array module...

The error just says that y_test is not a know type.

Anyway, you are not taking the problem the right side.

Because a multiprocessing.Array is an array of primitive types stored in shared memory. Here, you could easily build such an array through the flat iterator of the numpy array:

y_test = Array('d', np.zeros((10,10,4)).flat)

y_test is now a shared memory array of 400 doubles

Let us forget the multiprocessing for a while. You could process it that way:

for i in range(r):
    npi = start[:,:,i] + i
    y_test[10*10*i:10*10*(i+1)] = npi.flatten()

You could now convert it back into a numpy array:

resul = np.array(y_test).reshape((10,10,4), order = 'f')

The interesting point here is that each processing of a 10x10 array access a different part of the shared memory: we can build it in different processes

def process(a, i):
    npi = start[:,:, i] + i    # computes a numpy array
    a[10*10*i:10*10*(i+1)] = npi.flatten()  # stores it in shared memory

def main():
    y_test = Array('d', np.zeros((10,10,4)).flat) # initializes shared memory
    processes = []
    for i in range(4):  # uses 4 processes, one per each 10x10 array
        p = Process(target = process, args = (y_test, i))
        processes.append(p)
        p.start()
        print("Start", i, p.pid)  # to be sure the processes were started
    for i in range(4):
        p.join()        # join child processes
        print("Join", i, p.exitcode)
    resul = np.array(y_test).reshape((10,10,4), order='f') # rebuild the final numpy array
    print(resul)

if __name__ == '__main__':
    main()
Serge Ballesta
  • 143,923
  • 11
  • 122
  • 252
1

The Exception

You get an exception here, because the first parameter of Array(...) needs to be a valid ctype like c_bool, c_int, ..., or a single character representing such a type like 'i'. 'y_test' is not a valid ctype. It seems you put in variable names, instead of their types.

Try this, if you want to use integers here:

y_test = Array('i', np.zeros((10,10,4)))

Take a look at the Array documentation for more details on how the Array works. There is also a list of valid type codes you can use.

Additional thoughts, parallel execution

Also, keep in mind, that the process will just run the function loop with the arguments you provide. You provide two Arrays, but it seems that the loop function needs an Array and an integer as parameters. When you want to run multiple instances of the loop function with different values for i, you can create multiple Process instances and give them each a different value as parameter, like this:

p1 = Process(target = loop, args = (y_test, 1))
p2 = Process(target = loop, args = (y_test, 2))

Each process will run the loop function with a different parameter. You could also put this in a loop, to iterate over your arr to process every value in arr, which - I think - is what you want here.