2

I have a NumPy array/pandas Dataframe, which takes up more than 40 GB of data on disk.

I am trying to load this data, using np.load('array.npy') in each of three python scripts executed consecutively via a bash script (code.sh):

#!/bin/bash
python code1.py
# some code
python code2.py
# some code
python code3.py
# some code

# some code refers to some simple operations I do before running the next python script. Since I am loading a lot of data every time in each python file, this code (bash code.sh) takes a lot of time to run.

Is there a more efficient way to do this? Like if I am loading the data once in the bash script, and somehow feeding it to the python scripts?

I had to intentionally break up my python script into three because of the operations in between. So the solution of using one monolithic python script to work around this issue does not work in my case.

Mad Physicist
  • 107,652
  • 25
  • 181
  • 264
RANIT DAS
  • 31
  • 2
  • 1
    Each separate program/script needs a copy in memory, if you want to save memory just use one python script, not three. – mozway Oct 27 '21 at 18:48
  • Unfortunately, I cannot do that, which is why I asked this question. I had to forcefully break up the python file into 3 python files. – RANIT DAS Oct 27 '21 at 19:29
  • Compress `np.py` maybe? Or `mmap` it? – Mark Setchell Oct 27 '21 at 19:42
  • Most modern OSes will cache frequently used files. The second and third time should be faster than the first. Disk I/O still takes as long as it takes. – Mad Physicist Oct 27 '21 at 19:42
  • Can you customize the format of the data. I.e., are you willing to make it into just a numpy array that can be read from a flatfile? – Mad Physicist Oct 27 '21 at 19:55
  • Are you asking if I can use a different data format here? I can use any other data format, but ultimately, it should be possible for it to be loaded into the python codes, and then converted to numpy array, since I have arrays as my data, and I also do some simple array operations after loading them. – RANIT DAS Oct 27 '21 at 20:07
  • @mozway. Good call: you can run a memory manager for a shared memory block. – Mad Physicist Oct 27 '21 at 21:01
  • For future reference, "code" is a collective noun, like "water". It's "three scripts", not "three codes", like "three cups", not "three waters". When speaking colloquially, all bets are off of course. – Mad Physicist Oct 27 '21 at 21:07

1 Answers1

4

The python documentation for the multiprocessing.shared_memory.SharedMemory class covers an example that you may find very helpful. The basic process would be this:

  1. Make a python script that creates a shared buffer to store your array and load your data into it.
  2. The script will follow a model like How to process SIGTERM signal gracefully? to determine when to unlink the buffer.
  3. Run the script in the background.
  4. In your existing scripts, create an array object that wraps the shared buffer. Make sure the first script waits for the buffer to be populated.
  5. When all scripts are done, send SIGTERM to the shared memory manager.

For this example, I will assume that your data is a single array that you can store in a flatfile and load directly into a specified buffer.

MemoryManager.py

from multiprocessing.shared_memory import SharedMemory
from io import DEFAULT_BUFFER_SIZE
import numpy as np

SHM_NAME = 'large_numpy_array_shm'
SHM_SHAPE = (81920, 65536)  # Adds up to 40GiB exact if using float64
SHM_DTYPE = np.float64
SHM_SIZE = np.prod(SHM_SHAPE) * SHM_DTYPE().itemsize

_cached_buffer = None
def get_buffer(create=False):
    """ Get a reference to the shared buffer (once per process) """
    if _cached_buffer is None:
        _cached_buffer = SharedMemory(SHM_NAME, create=create, size=SHM_SIZE + 1)
    return _cached_buffer

def close_buffer(unlink=False):
    if _cached_buffer is not None:
        _cached_buffer.close()
        if unlink:
            _cached_buffer.unlink()
        _cached_buffer = None

def load_buffer():
    """ Read a file into the buffer. """
    buf = get_buffer(create=True)
    # set the byte after the data to 0 to indicate "not loaded yet"
    buf = np.ndarray(shape=SHM_SIZE + 1, dtype=np.uint8, buffer=_cached_buffer.buf)
    buf[-1] = 0
    # load the data
    with open('myfile.dat', 'rb') as file:
        pos = 0
        while pos < SHM_SIZE:
            # readinto accepts anything supporting the buffer interface
            # to get the right offset and size, use a slice
            rd = file.readinto(buf[pos:-1])
            if rd is not None:
                pos += rd
    # mark the data as loaded for other processes
    buf[-1] = 1

def get_array():
    """
    Retrieve the shared buffer as an array object once it has been loaded.
    Return None until it is loaded.
    """
    if np.ndarray(shape=SHM_SIZE + 1, dtype=np.uint8, buffer=get_buffer().buf)[-1] == 0:
        return None
    return np.ndarray(shape=SHM_SHAPE, dtype=SHM_DTYPE, buffer=get_buffer().buf)

if __name__ == '__main__':
    # Code to be run in manager process, not available to clients
    import signal, sys, time

    signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))

    try:
        # Just assume that the buffer is created and marked "not loaded"
        # before the first script starts. If it turns out that you need a
        # mutex or something, use one.
        load_buffer()
        while True:
            time.sleep(1)
    finally:
        close_buffer(unlink=True)

You may be able to get similar results with the SharedMemoryManager class, but I don't know how it works across processes that are not its children.

In code1.py, code2.py, and siblings, you can now use MemoryManager.py directly to get the shared array:

code1.py

from MemoryManager import get_array, close_buffer

...

if __name__ == '__main__':
    from time import sleep

    # Wait until array is loaded.
    # You can likely skip the loop in code2.py and on
    while (arr := get_array()) is None:
        sleep(1)

    # Now you have a global variable `arr` that you can pass around or use as a global

     # When you are finished using it, close the buffer, but
     # don't unlink, so the next process can use it.
     close_buffer()

The bash script will have to be augmented to kill MemoryManager once it's done, so that the shared block can be destroyed:

code.sh

#!/bin/bash
# Set up the shared array
python MemoryManager.py &

python code1.py
# some code
python code2.py
# some code
python code3.py
# some code

# send sigterm to MemoryManager
kill -s SIGTERM %1
Mad Physicist
  • 107,652
  • 25
  • 181
  • 264