0

I am writing my first multiprocessing program using Python 3.8. I have a large dataframe that I want all the processes to use. All the processes require only read-only access to the dataframe.

I have read that shared memory is one way of achieving this. However I'm not sure how exactly to allow each process to access my large dataframe using shared memory.

So in my head I do the lines like below.

# my dataframe, believe I need to get the number of bytes before creating a shared memory object
df_bytes = df.memory_usage(index=True).sum()

from multiprocessing import shared_memory
shm = shared_memory.SharedMemory(create=True, size=df_bytes)
memory_name = shm.name

From my limited understanding the shared memory object is assigned a name which will be used later.

Then in some function which is used by each process I make use of the shared memory object.

existing_shm = shared_memory.SharedMemory(name=memory_name)

But I am not sure how this lets me reference my large dataframe? I was trying to follow the example below, however its for numpy & I don't want to have to convert my dataframe to a numpy array. It looks like they have to re-create the numpy array for each process which doesn't make sense to me.

 >>> # In the first Python interactive shell
 >>> import numpy as np
 >>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
 >>> from multiprocessing import shared_memory
 >>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
 >>> # Now create a NumPy array backed by shared memory
 >>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
 >>> b[:] = a[:]  # Copy the original data into shared memory
 >>> b
  array([1, 1, 2, 3, 5, 8])
 >>> type(b)
 <class 'numpy.ndarray'>
 >>> type(a)
 <class 'numpy.ndarray'>
 >>> shm.name  # We did not specify a name so one was chosen for us
 'psm_21467_46075'

 >>> # In either the same shell or a new Python shell on the same machine
 >>> import numpy as np
 >>> from multiprocessing import shared_memory
 >>> # Attach to the existing shared memory block
 >>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
 >>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
 >>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
 >>> c
 array([1, 1, 2, 3, 5, 8])
 >>> c[-1] = 888
 >>> c
 array([  1,   1,   2,   3,   5, 888])

Does anyone have any pointers on how to put a large dataframe into shared memory?

Charchit Agarwal
  • 2,829
  • 2
  • 8
  • 20
mHelpMe
  • 6,336
  • 24
  • 75
  • 150
  • 1
    As an alternative approach to sharing dataframes, check this answer https://stackoverflow.com/questions/72798554/using-multiprocessing-to-speed-up-program-that-performs-row-wise-operations-on-a/72817277#72817277 – Charchit Agarwal Jul 18 '22 at 20:54

1 Answers1

1

The new shared_memory offered by multiprocessing can only store byte-type data. It cannot store raw dataframes, but it can store dataframes converted into bytes.

A few caveats though:

  1. This will limit the usefulness of using shared_memory, since you will be serializing/deserializing data anyway. Still, there would be notable performance benefits regardless since you'll be pickling the data only once rather than per process you spawn, and from the fact that you are not copying data from one process to another.
  2. You will want to make sure to delete any references to the shared memory buffer when you are done to allow garbage collection.
  3. The size of the memory blocks are fixed. Therefore, assuming that the size of the memory block equals the size of the dataframe in bytes, you will not be able to extend the dataframe. This probably won't be an issue for you since you are using it as read-only

With that said, here's an example of how to use shared memory to store an transfer a dataframe from one process to another, using pickle:

from multiprocessing import shared_memory, Process
import pandas as pd
import pickle


def a(name):
    # Attach to the same block
    shm_b = shared_memory.SharedMemory(name=name)

    # Access data inside the block
    pickled_df = pickle.loads(shm_b.buf)
    print(pickled_df)

    # Deleting any pointers that exist to the memory block is essential for garbage collection
    del pickled_df
    
    # Use close when the current process no longer requires the shared memory 
    shm_b.close()


if __name__ == "__main__":

    df = {'column1': [1, 2, 3, 4, 5], 'column2': [6, 7, 8, 9, 10], 'column3': [-1, -2, -3, -4, -5]}
    df = pd.DataFrame(df)

    print(df)

    pickled_df = pickle.dumps(df)
    size = len(pickled_df)

    # Create a shared_memory
    shm_a = shared_memory.SharedMemory(create=True, size=size)
    shm_a.buf[:size] = pickled_df

    # Notice that we only pass the name of the block, not the block itself
    b = Process(target=a, args=(shm_a.name,))
    b.start()
    b.join()
    
    shm_a.close()
    
    # Unlink should only be called once on a memory block
    shm_a.unlink()

There is a lot of ground for optimization here, mostly based in the fact that you control the pickling/unpickling process rather than multiprocessing. For example, if you don't care about compatibility with previous versions of python, you can use pickle.HIGHEST_PROTOCOL when pickling the data (more about protocols here). Additionally, instead of just pickling the entire dataframe, you could:

  • Split the dataframe into smaller chunks (say, divide them per column)
  • Pickle each smaller chunk and combine them into a list
  • Pickle the list, and store that inside the block

There might be some extra overhead following this since you are pickling not only the dataframe but the byte representation of it as well, but whether that's significant would depend on the data inside the dataframes. Usually, pickling/unpickling bytes is very fast, but if you supply larger data to pickle it can potentially find ways to optimize the process when compared to calling pickle 10 times on smaller chunks of that data. It's best to experiment here.

The benefit here, however, is that now you can supply the workers with the index of of the smaller dataframes that they need to work with, and they will only need to unpickle that set of index in the list rather than the entire byte representation of the dataframe. This way, you are effectively parallelizing with the potential of reducing the overhead.

Note that if your dataframe is already very large then you might need to split it into smaller chunks regardless to pickle. Lastly, you could also save the pickled dataframe to a file and load it from there if the dataframe hasn't changed in between runs, saving you some time there as well.

There is also another alternate approach to share dataframes using Managers which is less performant, but is more flexible. I wrote an answer detailing that here.

Charchit Agarwal
  • 2,829
  • 2
  • 8
  • 20