I'm trying to speed up my heavy calculations on large numpy
arrays by applying this tutorial here to my use case. In principal, I have an input array and a result array and want to share them throughout many processes, in which data are read from the input array, are tweaked and then written to the output array. I don't think I need locks, since the array indices for reading and writing will be unique for each process.
Here is my test example, based on the linked tutorial:
import numpy as np
import multiprocessing as mp
WORKER_DICT = dict()
def shared_array_from_np_array(data_array, init_value=None):
raw_array = mp.RawArray(np.ctypeslib.as_ctypes_type(data_array.dtype), data_array.size)
shared_array = np.frombuffer(raw_array, dtype=data_array.dtype).reshape(data_array.shape)
if init_value:
np.copyto(shared_array, np.full_like(data_array, init_value))
return raw_array, shared_array
else:
np.copyto(shared_array, data_array)
return raw_array, shared_array
def init_worker(data_array, result_array):
WORKER_DICT['data_array'] = data_array
WORKER_DICT['result_array'] = result_array
WORKER_DICT['shape'] = data_array.shape
def worker(i, j):
data = np.frombuffer(WORKER_DICT['data_array']).reshape(WORKER_DICT['shape'])
result = np.frombuffer(WORKER_DICT['worker_array']).reshape(WORKER_DICT['shape'])
result[i, j] = np.multiply(data[i, j], 2)
return
if __name__ == '__main__':
sh_in_arr, shared_input_array = shared_array_from_np_array(np.array(
[[1, 1, 2, 2],
[1, 1, 2, 2],
[3, 3, 4, 4],
[3, 3, 4, 4]]))
sh_res_arr, shared_result_array = shared_array_from_np_array(shared_input_array, 0)
init_args = (sh_in_arr, sh_res_arr)
with mp.Pool(processes=2, initializer=init_worker, initargs=init_args) as pool:
pool.map_async(worker, range(shared_input_array.shape[0]))
print('Input:', shared_input_array)
print('Output:', shared_result_array)
When I run it, I just get the same array again:
Input:
[[1 1 2 2]
[1 1 2 2]
[3 3 4 4]
[3 3 4 4]]
Output:
[[1 1 2 2]
[1 1 2 2]
[3 3 4 4]
[3 3 4 4]]
Am I even on the right track or is there something substantially wrong? Combine Pool.map with shared memory Array in Python multiprocessing looks way easier, but I don't even understand the original question.
EDIT: After the comments on old Python version, I switched it to Python 3.9 and added the actual outcome.