The python documentation for the multiprocessing.shared_memory.SharedMemory
class covers an example that you may find very helpful. The basic process would be this:
- Make a python script that creates a shared buffer to store your array and load your data into it.
- The script will follow a model like How to process SIGTERM signal gracefully? to determine when to unlink the buffer.
- Run the script in the background.
- In your existing scripts, create an array object that wraps the shared buffer. Make sure the first script waits for the buffer to be populated.
- When all scripts are done, send SIGTERM to the shared memory manager.
For this example, I will assume that your data is a single array that you can store in a flatfile and load directly into a specified buffer.
MemoryManager.py
from multiprocessing.shared_memory import SharedMemory
from io import DEFAULT_BUFFER_SIZE
import numpy as np
SHM_NAME = 'large_numpy_array_shm'
SHM_SHAPE = (81920, 65536) # Adds up to 40GiB exact if using float64
SHM_DTYPE = np.float64
SHM_SIZE = np.prod(SHM_SHAPE) * SHM_DTYPE().itemsize
_cached_buffer = None
def get_buffer(create=False):
""" Get a reference to the shared buffer (once per process) """
if _cached_buffer is None:
_cached_buffer = SharedMemory(SHM_NAME, create=create, size=SHM_SIZE + 1)
return _cached_buffer
def close_buffer(unlink=False):
if _cached_buffer is not None:
_cached_buffer.close()
if unlink:
_cached_buffer.unlink()
_cached_buffer = None
def load_buffer():
""" Read a file into the buffer. """
buf = get_buffer(create=True)
# set the byte after the data to 0 to indicate "not loaded yet"
buf = np.ndarray(shape=SHM_SIZE + 1, dtype=np.uint8, buffer=_cached_buffer.buf)
buf[-1] = 0
# load the data
with open('myfile.dat', 'rb') as file:
pos = 0
while pos < SHM_SIZE:
# readinto accepts anything supporting the buffer interface
# to get the right offset and size, use a slice
rd = file.readinto(buf[pos:-1])
if rd is not None:
pos += rd
# mark the data as loaded for other processes
buf[-1] = 1
def get_array():
"""
Retrieve the shared buffer as an array object once it has been loaded.
Return None until it is loaded.
"""
if np.ndarray(shape=SHM_SIZE + 1, dtype=np.uint8, buffer=get_buffer().buf)[-1] == 0:
return None
return np.ndarray(shape=SHM_SHAPE, dtype=SHM_DTYPE, buffer=get_buffer().buf)
if __name__ == '__main__':
# Code to be run in manager process, not available to clients
import signal, sys, time
signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
try:
# Just assume that the buffer is created and marked "not loaded"
# before the first script starts. If it turns out that you need a
# mutex or something, use one.
load_buffer()
while True:
time.sleep(1)
finally:
close_buffer(unlink=True)
You may be able to get similar results with the SharedMemoryManager
class, but I don't know how it works across processes that are not its children.
In code1.py
, code2.py
, and siblings, you can now use MemoryManager.py
directly to get the shared array:
code1.py
from MemoryManager import get_array, close_buffer
...
if __name__ == '__main__':
from time import sleep
# Wait until array is loaded.
# You can likely skip the loop in code2.py and on
while (arr := get_array()) is None:
sleep(1)
# Now you have a global variable `arr` that you can pass around or use as a global
# When you are finished using it, close the buffer, but
# don't unlink, so the next process can use it.
close_buffer()
The bash script will have to be augmented to kill MemoryManager
once it's done, so that the shared block can be destroyed:
code.sh
#!/bin/bash
# Set up the shared array
python MemoryManager.py &
python code1.py
# some code
python code2.py
# some code
python code3.py
# some code
# send sigterm to MemoryManager
kill -s SIGTERM %1