1

I'm trying to speed up my heavy calculations on large numpy arrays by applying this tutorial here to my use case. In principal, I have an input array and a result array and want to share them throughout many processes, in which data are read from the input array, are tweaked and then written to the output array. I don't think I need locks, since the array indices for reading and writing will be unique for each process.

Here is my test example, based on the linked tutorial:

import numpy as np
import multiprocessing as mp

WORKER_DICT = dict()

def shared_array_from_np_array(data_array, init_value=None):
    raw_array = mp.RawArray(np.ctypeslib.as_ctypes_type(data_array.dtype), data_array.size)
    shared_array = np.frombuffer(raw_array, dtype=data_array.dtype).reshape(data_array.shape)
    if init_value:
        np.copyto(shared_array, np.full_like(data_array, init_value))
        return raw_array, shared_array
    else:
        np.copyto(shared_array, data_array)
        return raw_array, shared_array

def init_worker(data_array, result_array):
    WORKER_DICT['data_array'] = data_array
    WORKER_DICT['result_array'] = result_array
    WORKER_DICT['shape'] = data_array.shape

def worker(i, j):
    data = np.frombuffer(WORKER_DICT['data_array']).reshape(WORKER_DICT['shape'])
    result = np.frombuffer(WORKER_DICT['worker_array']).reshape(WORKER_DICT['shape'])
    result[i, j] = np.multiply(data[i, j], 2)
    return

if __name__ == '__main__':
    sh_in_arr, shared_input_array = shared_array_from_np_array(np.array(
        [[1, 1, 2, 2],
         [1, 1, 2, 2],
         [3, 3, 4, 4],
         [3, 3, 4, 4]]))
    sh_res_arr, shared_result_array = shared_array_from_np_array(shared_input_array, 0)
    init_args = (sh_in_arr, sh_res_arr)
    with mp.Pool(processes=2, initializer=init_worker, initargs=init_args) as pool:
        pool.map_async(worker, range(shared_input_array.shape[0]))
    print('Input:', shared_input_array)
    print('Output:', shared_result_array)

When I run it, I just get the same array again:

Input: 
[[1 1 2 2]
 [1 1 2 2]
 [3 3 4 4]
 [3 3 4 4]]
Output: 
[[1 1 2 2]
 [1 1 2 2]
 [3 3 4 4]
 [3 3 4 4]]

Am I even on the right track or is there something substantially wrong? Combine Pool.map with shared memory Array in Python multiprocessing looks way easier, but I don't even understand the original question.

EDIT: After the comments on old Python version, I switched it to Python 3.9 and added the actual outcome.

s6hebern
  • 725
  • 9
  • 18
  • 1
    Is your version of Python very old? The option of using `with` with multiprocessing.Pool was not added until Python 3.3. – Tim Roberts Sep 02 '21 at 22:05
  • Oh... well, unfortunately, it is. Still on Python 2.7 due to company installation. Added a comment to the question. – s6hebern Sep 02 '21 at 22:21
  • Note that Python 2.7 is not developed/supported anymore including bug fixes, security issues, and so on (note to mention Python 2.7 is 11 year old). If you only use Numpy in the parallelized code, then I advise you to use Numba which is better suited for that (and more performant). Note however that Numba does not support all Numpy functions. Note also that `WORKER_DICT` is not shared between processes and by default no variables will be shared unless you explicitly use shared memory (which is a bit cumbersome to use and only works with Numpy or in restricted cases). – Jérôme Richard Sep 03 '21 at 09:29
  • Noted, thanks. As said in the edit, upgrading to 3.9 will be done soon, but in the meantime, I still need it in 2.7. But most functionalities are compatible, if not, there are usually workarounds I can try figuring out on my own. Not using numpy is not a good option for me, because my real-world use case makes heavy use of its functions. – s6hebern Sep 03 '21 at 10:35

1 Answers1

0

So after many failures and countless tries, I came up with this, which seems to work:

import time
import numpy as np
from multiprocessing import Pool, RawArray, cpu_count

# A global dictionary storing the variables passed from the initializer.
GLOBAL_DICT = {}

def init_worker(input_array, array_shape):
    # Using a dictionary is not strictly necessary. You can also
    # use global variables.
    GLOBAL_DICT['input_array'] = input_array
    GLOBAL_DICT['array_shape'] = array_shape

def worker_func(i):
    # Simply doubles all entries
    data = np.frombuffer(GLOBAL_DICT['input_array']).reshape(GLOBAL_DICT['array_shape'])
    return data[i, :] * 2

# We need this check for Windows to prevent infinitely spawning new child
# processes.
if __name__ == '__main__':
    start = time.time()
    my_array = np.array(
        [[1, 1, 2, 2],
         [1, 1, 2, 2],
         [3, 3, 4, 4],
         [3, 3, 4, 4]])
    array_shape = my_array.shape
    raw_array = RawArray('d', array_shape[0] * array_shape[1])
    # Wrap my_array as an numpy array.
    shared_array = np.frombuffer(raw_array).reshape(array_shape)
    # Copy my_array to shared array.
    np.copyto(shared_array, my_array)
    # Start the process pool and do the computation.
    args = (raw_array, array_shape)
    with Pool(processes=cpu_count()-2, initializer=init_worker, initargs=args) as pool:
        result = pool.map(worker_func, range(array_shape[0]))
        print(f'Input:\n{my_array}')
        print(f'Result:\n{np.array(result)}')

Prints:

Input:
[[1 1 2 2]
 [1 1 2 2]
 [3 3 4 4]
 [3 3 4 4]]
Result:
[[2. 2. 4. 4.]
 [2. 2. 4. 4.]
 [6. 6. 8. 8.]
 [6. 6. 8. 8.]]

I guess there are more efficient or prettier ways to do that. Ideally, I'd like to write directly to a shared output array, but for now, it works.

s6hebern
  • 725
  • 9
  • 18