12

I'm in a situation where I need to parallel process a very big numpy array (55x117x256x256). Trying to pass it around with the usual multiprocessing approach gives an AssertionError, which I understand to be because the array is too big to copy into each process. Because of this, I would like to try using shared memory with multiprocessing. (I'm open to other approaches, provided they aren't too complicated).

I've seen a few questions asking about the use of python multiprocessing's shared memory approach, e.g.

import numpy as np
import multiprocessing as mp

unsharedData = np.zeros((10,))
sharedData = mp.Array('d', unsharedData)

which seem to work fine. However, I haven't yet seen an example where this is done with a multidimensional array.

I've tried just sticking the multidimensional array into mp.Array which gives me TypeError: only size-1 arrays can be converted to Python scalars.

unsharedData2 = np.zeros((10,10))
sharedData2 = mp.Array('d', unsharedData2)## Gives TypeError

I can flatten the array, but I'd rather not if it can be avoided.

Is there some trick to get multiprocessing Array to handle multidimensional data?

Theolodus
  • 2,004
  • 1
  • 23
  • 30

4 Answers4

1

You can use np.reshape((-1,)) or np.ravel instead of np.flatten to make a 1-dimensional view of your array, without unnecessary copying that flatten does:

import numpy as np
import multiprocessing as mp

unsharedData2 = np.zeros((10, 10))
ravel_copy = np.ravel(unsharedData2)
reshape_copy2 = unsharedData2.reshape((-1,))
ravel_copy[11] = 1.0       # -> saves 1.0 in unsharedData2 at [1, 1]
reshape_copy2[22] = 2.0    # -> saves 2.0 in unsharedData2 at [2, 2]
sharedData2 = mp.Array('d', ravel_copy)
sharedData2 = mp.Array('d', reshape_copy2)
dankal444
  • 3,172
  • 1
  • 23
  • 35
0

You can create a new multidimensional numpy array in each process that shares the same memory by using get_obj() method associated with Array, which returns the ctypes array that presents a buffer interface.

See the below example:

import ctypes as c
import numpy as np
import multiprocessing as mp


unsharedData2 = np.zeros((10, 10))
n, m = unsharedData2.shape[0], unsharedData2.shape[1]


def f1(mp_arr):
    #in each new process create a new numpy array as follows:
    arr = np.frombuffer(mp_arr.get_obj())
    b = arr.reshape((n, m))# mp_arr arr and b share the same memory
    b[2][1] = 3


def f2(mp_arr):
    #in each new process create a new numpy array as follows:
    arr = np.frombuffer(mp_arr.get_obj())
    b = arr.reshape((n, m)) # mp_arr arr and b share the same memory
    b[1][1] = 2


if __name__ == '__main__':
    mp_arr = mp.Array(c.c_double, n*m)
    p = mp.Process(target=f1, args=(mp_arr,))
    q = mp.Process(target=f2, args=(mp_arr,))
    p.start()
    q.start()
    p.join()
    q.join()
    arr = np.frombuffer(mp_arr.get_obj())
    b = arr.reshape((10, 10))
    print(b)
    '''
    [[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 2. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 3. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
    '''
MaPy
  • 505
  • 1
  • 6
  • 9
0

While answers have already been given for multiprocessing, an alternative exists using ray which is an alternative multiprocessing framework.

With ray you can place any object into read-only shared memory using obj_ref = ray.put(obj). The nice thing is that ray has built in support for zero-copy retrieval of numpy arrays from shared memory.

There is a little overhead using rays implementation of shared memory, but considering the arrays are so large this probably won't be a problem.

import numpy as np
import ray

@ray.remote
def function(arr, num: int):
    # array is automatically retrieved if a reference is passed to
    # a remote function, you could do this manually with ray.get(ref)
    return arr.mean() + num

if __name__ == '__main__':
    ray.init()
    # generate array and place into shared memory, return reference
    array_ref = ray.put(np.random.randn(55, 117, 256, 256))
    # multiple processes operating on shared array
    results = ray.get([function.remote(array_ref, i) for i in range(8)])
    print(results)
Nathan
  • 71
  • 1
  • 3
0

You can use Numba to parallel process the array. I don't know exactly what kind of processing you are planning to do. But it's likely possible to speed up with numba.


from numba import njit, prange

@njit(parallel=True)
def process(array):
    m, n, o, p = array.shape
    for i in prange(m):
        # process slices individually

    # or do something else
    return result

You might also want to look into numbas vecorize, guvectorize or stencil depending on your use case.