70

The piece of code that I have looks some what like this:

glbl_array = # a 3 Gb array

def my_func( args, def_param = glbl_array):
    #do stuff on args and def_param

if __name__ == '__main__':
  pool = Pool(processes=4)
  pool.map(my_func, range(1000))

Is there a way to make sure (or encourage) that the different processes does not get a copy of glbl_array but shares it. If there is no way to stop the copy I will go with a memmapped array, but my access patterns are not very regular, so I expect memmapped arrays to be slower. The above seemed like the first thing to try. This is on Linux. I just wanted some advice from Stackoverflow and do not want to annoy the sysadmin. Do you think it will help if the the second parameter is a genuine immutable object like glbl_array.tostring().

martineau
  • 119,623
  • 25
  • 170
  • 301
san
  • 4,144
  • 6
  • 32
  • 50

5 Answers5

132

You can use the shared memory stuff from multiprocessing together with Numpy fairly easily:

import multiprocessing
import ctypes
import numpy as np

shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(10, 10)

#-- edited 2015-05-01: the assert check below checks the wrong thing
#   with recent versions of Numpy/multiprocessing. That no copy is made
#   is indicated by the fact that the program prints the output shown below.
## No copy was made
##assert shared_array.base.base is shared_array_base.get_obj()

# Parallel processing
def my_func(i, def_param=shared_array):
    shared_array[i,:] = i

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    pool.map(my_func, range(10))

    print shared_array

which prints

[[ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
 [ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.]
 [ 2.  2.  2.  2.  2.  2.  2.  2.  2.  2.]
 [ 3.  3.  3.  3.  3.  3.  3.  3.  3.  3.]
 [ 4.  4.  4.  4.  4.  4.  4.  4.  4.  4.]
 [ 5.  5.  5.  5.  5.  5.  5.  5.  5.  5.]
 [ 6.  6.  6.  6.  6.  6.  6.  6.  6.  6.]
 [ 7.  7.  7.  7.  7.  7.  7.  7.  7.  7.]
 [ 8.  8.  8.  8.  8.  8.  8.  8.  8.  8.]
 [ 9.  9.  9.  9.  9.  9.  9.  9.  9.  9.]]

However, Linux has copy-on-write semantics on fork(), so even without using multiprocessing.Array, the data will not be copied unless it is written to.

pv.
  • 33,875
  • 8
  • 55
  • 49
  • 1
    Excellent! that was a great answer. One doubt, do the definition of shared_array, shared_array_base need to be protected by `if __name__ == '__main__':`. My concern is that every time the module is loaded they will be redefined and cost additional space. But I may well be wrong. – san Apr 05 '11 at 17:28
  • 1
    The only constraint wrt. multiprocessing is that shared_array_base is defined before calling `pool.map`. `fork()` and `multiprocessing.Pool` will not re-import modules, so the only thing you need to be careful is with memory allocation inside `my_func()`. – pv. Apr 06 '11 at 13:12
  • 12
    Just to note, on Python fork() actually means copy on access (because just accessing the object will change its ref-count). – Fabio Zadrozny Jun 07 '12 at 17:34
  • 14
    The copy will only copy the memory page on which the refcount integer resides. The data in Numpy arrays is therefore not copied. – pv. Jun 11 '12 at 11:54
  • it works but keep giving me this warning : `/usr/lib/pymodules/python2.7/numpy/ctypeslib.py:402: RuntimeWarning: Item size computed from the PEP 3118 buffer format string does not match the actual item size. return array(obj, copy=False)` – Moj Jun 04 '13 at 13:56
  • 6
    got it. you should use np.frombuffer(shared_array_base.get_obj()) instead of np.ctypeslib.as_array – Moj Jun 04 '13 at 14:49
  • 1
    I'm guessing that on Windows this will not work since there is no `fork()` that makes parent variables visible to children. Is that correct? If so, how would you pass the shared array to pool workers on that OS? – Brian White Feb 13 '15 at 14:06
  • 1
    I don't know if this is due to a recent change in `multiprocessing`, but I tested this code on Linux with Python 2.7/3.4, and the assert is triggered. – BenC Apr 28 '15 at 02:06
  • The assert checks something more strict than whether data is copied or not, and apparently something has changed since then in multiprocessing or in numpy. If the program prints the same output as above, the data was not copied (because otherwise the different processes would have modified their own arrays). – pv. May 01 '15 at 20:06
  • I wonder is it necessary to add a lock in `my_func`, as `shared_array[i,:] = i` is not atomic? I know it works well without lock for array **write only**, how about the situation for array **read and write**? – Syrtis Major Sep 14 '15 at 08:36
  • As far as i know, the main script will be loaded to the all child processes in Windows. So, will the codes outside of `if __name__ == '__main__':` be loaded to every child process? – Crispy13 Dec 01 '20 at 08:45
  • This no longer seems to work. I'm on MacOS, Python 3.8, Numpy 1.19.2. It prints all zeros. – Danyal Nov 14 '21 at 22:21
  • I'm in python 3.8, it doesn't seems to work – maplemaple Dec 06 '21 at 04:50
14

The following code works on Win7 and Mac (maybe on linux, but not tested).

import multiprocessing
import ctypes
import numpy as np

#-- edited 2015-05-01: the assert check below checks the wrong thing
#   with recent versions of Numpy/multiprocessing. That no copy is made
#   is indicated by the fact that the program prints the output shown below.
## No copy was made
##assert shared_array.base.base is shared_array_base.get_obj()

shared_array = None

def init(shared_array_base):
    global shared_array
    shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
    shared_array = shared_array.reshape(10, 10)

# Parallel processing
def my_func(i):
    shared_array[i, :] = i

if __name__ == '__main__':
    shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)

    pool = multiprocessing.Pool(processes=4, initializer=init, initargs=(shared_array_base,))
    pool.map(my_func, range(10))

    shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
    shared_array = shared_array.reshape(10, 10)
    print shared_array
taku-y
  • 141
  • 1
  • 2
9

For those stuck using Windows, which does not support fork() (unless using CygWin), pv's answer does not work. Globals are not made available to child processes.

Instead, you must pass the shared memory during the initializer of the Pool/Process as such:

#! /usr/bin/python

import time

from multiprocessing import Process, Queue, Array

def f(q,a):
    m = q.get()
    print m
    print a[0], a[1], a[2]
    m = q.get()
    print m
    print a[0], a[1], a[2]

if __name__ == '__main__':
    a = Array('B', (1, 2, 3), lock=False)
    q = Queue()
    p = Process(target=f, args=(q,a))
    p.start()
    q.put([1, 2, 3])
    time.sleep(1)
    a[0:3] = (4, 5, 6)
    q.put([4, 5, 6])
    p.join()

(it's not numpy and it's not good code but it illustrates the point ;-)

Brian White
  • 8,332
  • 2
  • 43
  • 67
1

If you are looking for an option that works efficiently on Windows, and works well for irregular access patterns, branching, and other scenarios where you might need to analyze different matrices based on a combination of a shared-memory matrix and process-local data, the mathDict toolkit in the ParallelRegression package was designed to handle this exact situation.

RichardB
  • 114
  • 9
0

I know, I am answering to a very old question. But the this topic does not work in Windows OS. The above answers were misleading without providing substantial proof. So I had tried following code.

# -*- coding: utf-8 -*-
from __future__ import annotations
import ctypes
import itertools
import multiprocessing
import os
import time
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import numpy.typing as npt


shared_np_array_for_subprocess: npt.NDArray[np.double]


def init_processing(shared_raw_array_obj: ctypes.Array[ctypes.c_double]):
    global shared_np_array_for_subprocess
    #shared_np_array_for_subprocess = np.frombuffer(shared_raw_array_obj, dtype=np.double)
    shared_np_array_for_subprocess = np.ctypeslib.as_array(shared_raw_array_obj)


def do_processing(i: int) -> int:
    print("\n--------------->>>>>>")
    print(f"[P{i}] input is {i} in process id {os.getpid()}")
    print(f"[P{i}] 0th element via np access: ", shared_np_array_for_subprocess[0])
    print(f"[P{i}] 1st element via np access: ", shared_np_array_for_subprocess[1])
    print(f"[P{i}] NP array's base memory is: ", shared_np_array_for_subprocess.base)
    np_array_addr, _ = shared_np_array_for_subprocess.__array_interface__["data"]
    print(f"[P{i}] NP array obj pointing memory address is: ", hex(np_array_addr))
    print("\n--------------->>>>>>")
    time.sleep(3.0)
    return i


if __name__ == "__main__":
    shared_raw_array_obj: ctypes.Array[ctypes.c_double] = multiprocessing.RawArray(ctypes.c_double, 128)  # 8B * 1MB = 8MB
    # This array is malloced, 0 filled.
    print("Shared Allocated Raw array: ", shared_raw_array_obj)
    shared_raw_array_ptr = ctypes.addressof(shared_raw_array_obj)
    print("Shared Raw Array memory address: ", hex(shared_raw_array_ptr))

    # Assign data
    print("Assign 0, 1 element data in Shared Raw array.")
    shared_raw_array_obj[0] = 10.2346
    shared_raw_array_obj[1] = 11.9876

    print("0th element via ptr access: ", (ctypes.c_double).from_address(shared_raw_array_ptr).value)
    print("1st element via ptr access: ", (ctypes.c_double).from_address(shared_raw_array_ptr + ctypes.sizeof(ctypes.c_double)).value)

    print("Create NP array from the Shared Raw array memory")
    shared_np_array: npt.NDArray[np.double] = np.frombuffer(shared_raw_array_obj, dtype=np.double)

    print("0th element via np access: ", shared_np_array[0])
    print("1st element via np access: ", shared_np_array[1])

    print("NP array's base memory is: ", shared_np_array.base)
    np_array_addr, _ = shared_np_array.__array_interface__["data"]
    print("NP array obj pointing memory address is: ", hex(np_array_addr))

    print("NP array , Raw array points to same memory , No copies? : ", np_array_addr == shared_raw_array_ptr)

    print("Now that we have native memory based NP array , Send for multi processing.")

    # results = []
    with ProcessPoolExecutor(max_workers=4, initializer=init_processing, initargs=(shared_raw_array_obj,)) as process_executor:
        results = process_executor.map(do_processing, range(0, 2))

    print("All jobs sumitted.")
    for result in results:
        print(result)

    print("Main process is going to shutdown.")
    exit(0)

here is the sample output

Shared Allocated Raw array:  <multiprocessing.sharedctypes.c_double_Array_128 object at 0x000001B8042A9E40>
Shared Raw Array memory address:  0x1b804300000
Assign 0, 1 element data in Shared Raw array.
0th element via ptr access:  10.2346
1st element via ptr access:  11.9876
Create NP array from the Shared Raw array memory
0th element via np access:  10.2346
1st element via np access:  11.9876
NP array's base memory is:  <multiprocessing.sharedctypes.c_double_Array_128 object at 0x000001B8042A9E40>
NP array obj pointing memory address is:  0x1b804300000
NP array , Raw array points to same memory , No copies? :  True
Now that we have native memory based NP array , Send for multi processing.

--------------->>>>>>
[P0] input is 0 in process id 21852
[P0] 0th element via np access:  10.2346
[P0] 1st element via np access:  11.9876
[P0] NP array's base memory is:  <memory at 0x0000021C7ACAFF40>
[P0] NP array obj pointing memory address is:  0x21c7ad60000

--------------->>>>>>

--------------->>>>>>
[P1] input is 1 in process id 11232
[P1] 0th element via np access:  10.2346
[P1] 1st element via np access:  11.9876
[P1] NP array's base memory is:  <memory at 0x0000022C7FF3FF40>
[P1] NP array obj pointing memory address is:  0x22c7fff0000

--------------->>>>>>
All jobs sumitted.
0
1
Main process is going to shutdown.

The above output is from following environment:

OS: Windows 10 20H2
Python: Python 3.9.9 (tags/v3.9.9:ccb0e6a, Nov 15 2021, 18:08:50) [MSC v.1929 64 bit (AMD64)]

You can clearly see that, The numpy's pointing memory array's different for every subprocess , Meaning memcopies are made. So in Windows OS, Subprocess does not share the underlaying memory. I do think, its due to OS protection, Processes can not refer arbitrary pointer address in memory , it will lead to memory access violations.

Bakkiaraj
  • 99
  • 2
  • 3