5

I have a dummy function test_func(n), which multiplies the np.ones (i.e., a 3x3x3 array) with a number n.

def test_func(n):
    array = n*np.ones((3,3,3))
    return array

Let's say I want to run this function in parallel and want the total sum of all the arrays, while keeping the array's shape intact (i.e., the output array should also be 3x3x3). For this, I normally define a variable that can save the results of the multiprocessing simulations and then add them all up (final_array).

#Multiprocessing starts here
from multiprocessing import Pool
if __name__ == '__main__':
    pool = Pool()
    grid = np.arange(2,4,1) # generates array [2,3]
    final_array  = pool.map(test_func, grid) # Holds the results of the multiprocessing simulations
    final_array  = sum(final_array, axis = 0) # A bit indirect
    pool.close()
    pool.join()

print(final_array) 
#[Output]: This is what I want from the code in the next column
array([[[5., 5., 5.],
        [5., 5., 5.],
        [5., 5., 5.]],

       [[5., 5., 5.],
        [5., 5., 5.],
        [5., 5., 5.]],

       [[5., 5., 5.],
        [5., 5., 5.],
        [5., 5., 5.]]])

Although a bit indirect (code shown above), it gets the job done. But a not-so-recent discussion on StackOverflow showed me that this could be done in a more elegant way: https://stackoverflow.com/questions/7894791/use-numpy-array-in-shared-memory-for-multiprocessing (answered by @jfs).

I tried to replicate the process but am unsure what I am doing wrong (trial code below).

import ctypes
import numpy as np

L, N, M = 3,3,3 # Shape of the array
mp_arr = mp.Array(ctypes.c_double, L * N * M)
final_array = np.frombuffer(mp_arr.get_obj())
final_array = final_array.reshape((L, N, M))

def test_func(n):
    final_array = np.frombuffer(mp_arr.get_obj())
    final_array = TC_p_value.reshape((L, N, M))
    final_array += n*np.ones((3,3,3))

def init(shared_arr_): # I do not even know what his one does
    global mp_arr
    mp_arr = shared_arr_

#Multiprocessing starts here
from multiprocessing import Pool
if __name__ == '__main__':
    pool = Pool(initializer=init, initargs=(mp_arr,))
    grid = np.arange(2,4,1)
    pool.map_async(test_func, grid)
    pool.close()
    pool.join()

Ep1c1aN
  • 683
  • 9
  • 25
  • If your computing functions do not need the GIL, like operating on Numpy arrays, then I advise you to consider using Cython (or Numba) parallelism. You will not pay the huge cost of Pickling/synchronizations which is (nearly) mandatory with CPython multiprocessing. Cython/Numba can also make your code faster even without running it in parallel. – Jérôme Richard Aug 18 '22 at 18:42

1 Answers1

0

First, initialize the shared Array inside __main__ block. Also, use mp_arr.get_lock() because every process is accessing the whole 3x3x3 array:

import ctypes
import numpy as np
from multiprocessing import Pool, Array


def test_func(n):
    with mp_arr.get_lock():  # <-- every process is accessing the same slice, so get_lock()
        final_array = np.frombuffer(mp_arr.get_obj())
        final_array = final_array.reshape((L, N, M))
        final_array += n * np.ones((3, 3, 3))


def init(shared_arr_):
    global mp_arr
    mp_arr = shared_arr_


if __name__ == "__main__":
    L, N, M = 3, 3, 3
    mp_arr = Array(ctypes.c_double, L * N * M)
    grid = np.arange(2, 4, 1)

    with Pool(initializer=init, initargs=(mp_arr,)) as pool:
        pool.map(test_func, grid)

    final_array = np.frombuffer(mp_arr.get_obj())
    final_array = final_array.reshape((L, N, M))
    print(final_array)

Prints:

[[[5. 5. 5.]
  [5. 5. 5.]
  [5. 5. 5.]]

 [[5. 5. 5.]
  [5. 5. 5.]
  [5. 5. 5.]]

 [[5. 5. 5.]
  [5. 5. 5.]
  [5. 5. 5.]]]
Andrej Kesely
  • 168,389
  • 15
  • 48
  • 91
  • Fantastic, now the code works. Thanks for answering @Andrej Kesely. I just wanted to point out that my initial method (the 'indirect' one in the 2nd code snippet) is two times faster than this 'corrected shared memory' version suggested by you. Just checked it for a complex function. I find this quite strange since the 'corrected version' runs as a loop rather than running parallelly. Do you happen to know why that is? – Ep1c1aN Aug 15 '22 at 23:50
  • @Ep1c1aN because each process locks the array while working on it, and that prevents other processes from accessing it. This is no different from a loop as you have already noticed. Downvoted – oguz ismail Aug 16 '22 at 03:00
  • @oguzismail It locks the array because each process access the **whole** array. It needs to be synchronized. – Andrej Kesely Aug 16 '22 at 07:01
  • @AndrejKesely Then explain that instead of posting code that is simply a loop with a hundred extra steps. – oguz ismail Aug 16 '22 at 07:37
  • @oguzismail There isn't **100** extra steps - simply create `Array`, each process run a function inside `Pool` and print `final_array`. The locking is explained in the comment in the code already. – Andrej Kesely Aug 16 '22 at 07:40
  • @oguzismail Clearly this is example code meant to demonstrate a concept. In reality I would expect the `n * np.ones((3, 3, 3))` in this example to be something significantly more involved, and which could be performed without the lock. the "writeback" phase would not be performed in parallel due to the lock, but the alternative of returning the result, sending it via pickle / unpickle through an `mp.Queue` and *then* summing the results is likely still worse. If needed a map-reduce strategy could also be used to reduce the time of summing the values to as little as log time instead of linear. – Aaron Aug 16 '22 at 15:16