134

I would like to use a numpy array in shared memory for use with the multiprocessing module. The difficulty is using it like a numpy array, and not just as a ctypes array.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

This produces output such as:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

The array can be accessed in a ctypes manner, e.g. arr[i] makes sense. However, it is not a numpy array, and I cannot perform operations such as -1*arr, or arr.sum(). I suppose a solution would be to convert the ctypes array into a numpy array. However (besides not being able to make this work), I don't believe it would be shared anymore.

It seems there would be a standard solution to what has to be a common problem.

Praveen
  • 6,872
  • 3
  • 43
  • 62
Ian Langmore
  • 2,819
  • 5
  • 21
  • 20
  • 1
    It's not the same as this one? http://stackoverflow.com/questions/5033799/how-do-i-pass-large-numpy-arrays-between-python-subprocesses-without-saving-to-d/5036766#5036766 – pygabriel Oct 26 '11 at 19:06
  • 1
    It's not quite the same question. The linked question is asking about `subprocess` rather than `multiprocessing`. – Andrew Jan 21 '13 at 03:43
  • Related question: [Share Large, Read-Only Numpy Array Between Multiprocessing Processes](https://stackoverflow.com/questions/17785275/share-large-read-only-numpy-array-between-multiprocessing-processes?noredirect=1&lq=1) – Jasha May 28 '22 at 20:29

6 Answers6

100

To add to @unutbu's (not available anymore) and @Henry Gomersall's answers. You could use shared_arr.get_lock() to synchronize access when needed:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

Example

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

If you don't need synchronized access or you create your own locks then mp.Array() is unnecessary. You could use mp.sharedctypes.RawArray in this case.

jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • 2
    Beautiful answer! If I want to have more than one shared array, each separately lockable, but with the number of arrays determined at runtime, is that a straightforward extension of what you've done here? – Andrew Jan 19 '13 at 04:30
  • 4
    @Andrew: shared arrays should be created *before* child processes are spawned. – jfs Jan 19 '13 at 06:13
  • Good point about order of operations. That's what I had in mind, though: create a user-specified number of shared arrays, then spawn a few child processes. Is that straightforward? – Andrew Jan 19 '13 at 15:49
  • I've made another question to deal with this detail: http://stackoverflow.com/q/14416130/513688 – Andrew Jan 19 '13 at 16:05
  • I would look also at the ZMQ no-copy modes. – meawoppl Aug 08 '14 at 23:02
  • @J.F.Sebastian If I for some reason can't know the size of the shared array before the child processes are spawned, what is then a good work-around? I'm thinking of creating the shared array with size 1 and then send a pipe message to one child process and extend the array with the desired size. The pipe are created regardless because of other operations I need them for. Is this a good method? – Chicony Dec 08 '16 at 10:14
  • 1
    @Chicony: you can't change the size of the Array. Think of it as a shared block of memory that had to be allocated before child processes are started. You don't need to use all the memory e.g., you could pass `count` to `numpy.frombuffer()`. You could try to do it on a lower level using `mmap` or something like `posix_ipc` directly to implement a resizable (might involve copying while resizing) RawArray analog (or look for an existing library). Or if your task allows it: copy data in parts (if you don't need all at once). "How to resize a shared memory" is a good separate question. – jfs Dec 08 '16 at 12:35
  • @J.F.Sebastian I started a separate question, http://stackoverflow.com/q/41037808/4759898 – Chicony Dec 08 '16 at 12:56
  • @jfs I want to share numpy random state of a parent process with a child process. I've tried using `Manager` but still no luck. Could you please take a look at my question [here](https://stackoverflow.com/questions/49372619/how-to-share-numpy-random-state-of-a-parent-process-with-child-processes) and see if you can offer a solution? I can still get different random numbers if I do `np.random.seed(None)` every time that I generate a random number, but this does not allow me to use the random state of the parent process, which is not what I want. Any help is greatly appreciated. – Amir Mar 20 '18 at 02:33
  • pretty basic question here, but is M arbitrarily chosen? Is M the number of processes that will be running in parallel? – umop apisdn Jul 16 '18 at 09:57
  • 1
    @umopapisdn: `Pool()` defines the number of processes (the number of available CPU cores is used by default). `M` is the number of times `f()` function is called. – jfs Jul 16 '18 at 13:08
  • [The TL;DR can be found here](https://stackoverflow.com/questions/9754034/can-i-create-a-shared-multiarray-or-lists-of-lists-object-in-python-for-multipro) – Tobia Tesan Oct 31 '18 at 20:07
  • These few lines `shared_arr = mp.Array(ctypes.c_double, N); arr = tonumpyarray(shared_arr); arr[:] = np.random.uniform(size=N)` completely solve my issue(this works in python 3.7.5) – mathguy Nov 14 '19 at 05:36
21

While the answers already given are good, there is a much easier solution to this problem provided two conditions are met:

  1. You are on a POSIX-compliant operating system (e.g. Linux, Mac OSX); and
  2. Your child processes need read-only access to the shared array.

In this case you do not need to fiddle with explicitly making variables shared, as the child processes will be created using a fork. A forked child automatically shares the parent's memory space. In the context of Python multiprocessing, this means it shares all module-level variables; note that this does not hold for arguments that you explicitly pass to your child processes or to the functions you call on a multiprocessing.Pool or so.

A simple example:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))
EelkeSpaak
  • 2,757
  • 2
  • 17
  • 37
  • 5
    +1 Really valuable info. Can you explain why it is only module-level vars that are shared? Why are local vars not part of the parent's memory space? E.g., why can't this work if I have a function F with local var V and a function G inside of F which references V? – Coffee_Table Oct 27 '17 at 01:38
  • 14
    Warning: This answer is a little deceptive. The child process receives a copy of the state of the parent process, including global variables, at the time of the fork. The states are in no way synchronized and will diverge from that moment. This technique may be useful in some scenarios (e.g.: forking off ad-hoc child processes that each handle a snapshot of the parent process and then terminate), but is useless in others (e.g.: long-running child processes that have to share and sync data with the parent process). – David Stein Apr 07 '18 at 07:50
  • 1
    @DavidStein: Yes, I think I explicitly mentioned in the answer (point 2) that syncing data with the parent process (thus requiring more than read-only access) will prevent this technique from working as expected. In cases of child processes doing more than one task etc. you'd be better off using explicit shared memory and not relying on fork semantics. In many cases, however, the forking trick works very well for me (e.g. each child process does some costly computation on one slice of a matrix present in parent memory). – EelkeSpaak Apr 08 '18 at 10:33
  • 6
    @EelkeSpaak: Your statement - "a forked child automatically shares the parent's memory space" - is incorrect. If I have a child process that wants to monitor the state of the parent process, in a strictly read-only manner, forking will not get me there: the child only sees a snapshot of the parent state at the moment of forking. In fact, that's precisely what I was trying to do (following your answer) when I discovered this limitation. Hence the postscript on your answer. In a nutshell: The parent state is not "shared," but merely copied to the child. That's not "sharing" in the usual sense. – David Stein Apr 09 '18 at 03:30
  • 5
    Am I mistaken to think this is a copy-on-write situation, at least on posix systems? That is, after the fork, I think the memory is shared until new data is written, at which point a copy is created. So yes, it's true that the data isn't "shared" exactly, but it can provide a potentially huge performance boost. If your process is read only, then there will be no copying overhead! Have I understood the point correctly? – senderle Oct 31 '18 at 04:45
  • 2
    @senderle Yes, that is exactly what I meant! Hence my point (2) in the answer about read-only access. – EelkeSpaak Nov 01 '18 at 08:31
  • 1
    Note that this only works with `multiprocessing.set_start_method('fork')` (default currently on Unix). There are situations where you rather want to use *spawn* instead of *fork* (e.g. if you must use some buggy library which does not behave well after a fork). – Albert May 18 '19 at 15:23
21

The Array object has a get_obj() method associated with it, which returns the ctypes array which presents a buffer interface. I think the following should work...

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

When run, this prints out the first element of a now being 10.0, showing a and b are just two views into the same memory.

In order to make sure it is still multiprocessor safe, I believe you will have to use the acquire and release methods that exist on the Array object, a, and its built in lock to make sure its all safely accessed (though I'm not an expert on the multiprocessor module).

Henry Gomersall
  • 8,434
  • 3
  • 31
  • 54
  • it won't work without synchronization as @unutbu demonstrated in his (now deleted) answer. – jfs Oct 26 '11 at 20:40
  • 1
    Presumably, if you just wanted to access the array post processing, it can be done cleanly without worrying about concurrency issues and locking? – Henry Gomersall Oct 26 '11 at 21:28
  • in this case you don't need `mp.Array`. – jfs Oct 26 '11 at 21:53
  • 1
    The processing code may require locked arrays, but the post processing interpretation of the data might not necessarily. I guess this comes from understanding what exactly the problem is. Clearly, accessing shared data concurrently is going to require some protection, which I thought would be obvious! – Henry Gomersall Oct 26 '11 at 22:12
12

I've written a small python module that uses POSIX shared memory to share numpy arrays between python interpreters. Maybe you will find it handy.

https://pypi.python.org/pypi/SharedArray

Here's how it works:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])
mat
  • 129
  • 1
  • 2
9

You can use the sharedmem module: https://bitbucket.org/cleemesser/numpy-sharedmem

Here's your original code then, this time using shared memory that behaves like a NumPy array (note the additional last statement calling a NumPy sum() function):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()
Velimir Mlaker
  • 10,664
  • 4
  • 46
  • 58
  • 2
    Note: this is no longer being developed and does not seem to work on linux https://github.com/sturlamolden/sharedmem-numpy/issues/4 – A.D Mar 08 '16 at 17:38
6

With Python3.8+ there is the multiprocessing.shared_memory standard library:

# np_sharing.py
from multiprocessing import Process
from multiprocessing.managers import SharedMemoryManager
from multiprocessing.shared_memory import SharedMemory
from typing import Tuple

import numpy as np


def create_np_array_from_shared_mem(
    shared_mem: SharedMemory, shared_data_dtype: np.dtype, shared_data_shape: Tuple[int, ...]
) -> np.ndarray:
    arr = np.frombuffer(shared_mem.buf, dtype=shared_data_dtype)
    arr = arr.reshape(shared_data_shape)
    return arr


def child_process(
    shared_mem: SharedMemory, shared_data_dtype: np.dtype, shared_data_shape: Tuple[int, ...]
):
    """Logic to be executed by the child process"""
    arr = create_np_array_from_shared_mem(shared_mem, shared_data_dtype, shared_data_shape)
    arr[0, 0] = -arr[0, 0]  # modify the array backed by shared memory


def main():
    """Logic to be executed by the parent process"""

    # Data to be shared:
    data_to_share = np.random.rand(10, 10)

    SHARED_DATA_DTYPE = data_to_share.dtype
    SHARED_DATA_SHAPE = data_to_share.shape
    SHARED_DATA_NBYTES = data_to_share.nbytes

    with SharedMemoryManager() as smm:
        shared_mem = smm.SharedMemory(size=SHARED_DATA_NBYTES)

        arr = create_np_array_from_shared_mem(shared_mem, SHARED_DATA_DTYPE, SHARED_DATA_SHAPE)
        arr[:] = data_to_share  # load the data into shared memory

        print(f"The [0,0] element of arr is {arr[0,0]}")  # before

        # Run child process:
        p = Process(target=child_process, args=(shared_mem, SHARED_DATA_DTYPE, SHARED_DATA_SHAPE))
        p.start()
        p.join()

        print(f"The [0,0] element of arr is {arr[0,0]}")  # after

        del arr  # delete np array so the shared memory can be deallocated


if __name__ == "__main__":
    main()

Running the script:

$ python3.10 np_sharing.py
The [0,0] element of arr is 0.262091705529628
The [0,0] element of arr is -0.262091705529628

Since the arrays in different processes share the same underlying memory buffer, the standard caveats r.e. race conditions apply.

Jasha
  • 5,507
  • 2
  • 33
  • 44