Here is a solution that does not require Python version >= 3.8 and just uses multiprocessing.Array
. The idea is to use such a shared array as the backing store for a numpy
array.
In this example we have each process in the pool initialize a global variable np_array
and then we do not have to explicitly pass the shared array to each worker function. This avoids the worker functions from having to concern themselves with re-creating a numpy
array from the shared array. Moreover, this re-creation only has to be done N
times where N
is the pool size rather than M
times where M
is the number of tasks submitted to the pool. If you find global variables an anathema, then the alternative is to explicitly pass the shared array as an argument to each worker function and have it re-create the numpy
array from it.
import numpy as np
from multiprocessing import Array, Pool
def np_array_from_shared_array(shared_array, shape, is_locked_array=True):
shared_array_obj = shared_array.get_obj() if is_locked_array else shared_array
return np.frombuffer(shared_array_obj, dtype=np.float64).reshape(shape[0], shape[1])
def init_pool_processes(shared_array, shape, is_locked_array):
"""
Init each pool process.
The numpy array is created from the passed shared array and a global
variable is initialized with a reference to it.
"""
global np_array
np_array = np_array_from_shared_array(shared_array, shape, is_locked_array)
def change_array(i, j):
np_array[i, j] += 100
if __name__ == '__main__':
data = np.array([[1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7], [7.7, 6.6, 5.5, 4.4, 3.3, 2.2, 1.1]])
shape = data.shape
# Specify lock=True if multiple processs will be updating the same
# array element.
# Each task will specify a unique element, so no locking is required:
NEEDS_LOCKING = False
shared_array = Array('d', shape[0] * shape[1], lock=NEEDS_LOCKING)
# Wrap np_array as an numpy array so we can easily manipulates its data.
np_array = np_array_from_shared_array(shared_array, shape, NEEDS_LOCKING)
# Copy data to our shared array.
np.copyto(np_array, data)
# Before
print(np_array)
# Init each process in the pool with shared_array:
pool = Pool(initializer=init_pool_processes, initargs=(shared_array, shape, NEEDS_LOCKING))
result = pool.starmap(change_array, ((i, j) for i in range(shape[0]) for j in range(shape[1])))
pool.close()
pool.join()
# After:
print(np_array)
Prints:
[[1.1 2.2 3.3 4.4 5.5 6.6 7.7]
[7.7 6.6 5.5 4.4 3.3 2.2 1.1]]
[[101.1 102.2 103.3 104.4 105.5 106.6 107.7]
[107.7 106.6 105.5 104.4 103.3 102.2 101.1]]