5

In one answer to: Is shared readonly data copied to different processes for multiprocessing? a working solution for shared memory for a numpy array is given.

How would the same look like if a pandas DataFrame should be used?

Background: I would like to be able to write to the DataFrame during multiprocessing and would like to be able to process it further after the multiprocessing has finished.

  • Have you considered using dask? – rpanai Nov 15 '18 at 17:52
  • Thanks for your input. I would like to add new rows to the Dataframe with ʼdf.loc[len(df)] = [x, x]ʼ. Would dask help me with this easily and take care that this happens in a synchronized way? –  Nov 16 '18 at 04:13
  • Do you mean preserving order? – rpanai Nov 16 '18 at 14:24
  • 1
    You might want to have a look as this toy example [pastebin](https://pastebin.com/0y9RfXaW) – rpanai Nov 16 '18 at 14:36

1 Answers1

4

If you don't want to use dask, you can share a pandas dataframe using shared memory by first converting it to a numpy array and then reconstructing it in the child processes.

from multiprocessing import shared_memory

def create_shared_block(to_share, dtypes):
    # float64 can't be pickled
    for col, dtype in to_share.dtypes.items():
        if dtype == 'float64':
            to_share[col] = pd.to_numeric(to_share[col], downcast='float')
            
    # make the dataframe a numpy array
    to_share.reset_index(inplace=True)
    
    # drop the index if named index
    to_share = to_share.drop('index', axis=1)
    
    # get the dtypes in the same order as the dataframe columns and make sure the types are correct for numpy
    dtypes_sorted = sort_dtypes(to_share, dtypes)
    
    # get the dataframe values in the format expected by numpy
    values = [tuple(x) for x in to_share.values.tolist()]
    
    # create a numpy array
    to_share = np.array(values, dtype=(dtypes_sorted))
    
    # create a shared memory of the size of the array
    shm = shared_memory.SharedMemory(create=True, size=to_share.nbytes)
    
    # now create a NumPy array backed by shared memory
    np_array = np.ndarray(to_share.shape, dtype=dtypes_sorted, buffer=shm.buf)
    
    # Copy the original data into shared memory
    np_array[:] = to_share[:]
    return shm, np_array, dtypes_sorted


def sort_dtypes(df, dtypes):
    # category is a pandas dtype, not numpy
    string_types = ('category', 'object', '|S')
    dtypes = [(x, '|S{}'.format(df[x].str.len().max())) if y in string_types else (x, y) for x, y in dtypes if
              x in df.columns]
    # build a lookup
    dtypes_dict = {x: y for x, y in dtypes}
    # fix the order
    dtypes_sorted = [(x, dtypes_dict[x]) for x in df.columns]
    return dtypes_sorted

# ------PARENT PROCESS-------#
# create your shared memory
to_share = pd.DataFrame([['obstacle','obstacle',2,3],['obstacles','obstacle',2,np.nan]],columns=['w1','w2','d1','d2'])
dtypes = [('w1','str'),('w2','|S'),('d1','f'),('d2','f')]
shm, arr, dtypes_sorted = create_shared_block(to_share, dtypes)

# then pass these values to your child processes
shared = (shm.name, arr.shape, dtypes_sorted)

# ------CHILD PROCESS-------#
# assuming you have passed to the child process in a variable called shared, you can reconstruct the dataframe as follows
shared_memory = shared_memory.SharedMemory(name=shared[0])
np_array = np.ndarray(shared[1], dtype=shared[2], buffer=shared_memory.buf)
columns = [x for x, y in shared[2]]
df = pd.DataFrame(np_array, columns=columns)

This has saved some memory in my app when sharing a 100k row dataframe but probably not as much as I could save using some established library like dask. And I'm not too sure of the overhead involved in recreating the pandas dataframe - I'd like to think it just references the shared numpy array and adds some extra stuff on top to make it a df.

forgetso
  • 2,194
  • 14
  • 33
  • 1
    >=Python 3.8 only – VovaM Nov 09 '20 at 13:22
  • I'm looking for a straightforward solution to pass a DF into shared memory across multiple processes. I understand what's happening here, but is there any way of passing in the DF without reconstructing it from a numpy array? This seems like a huge amount of code for something that's surely quite common. Especially, if you don't know the layout of the dataframe, thus relying on lots of iterations and conditional logic. – Anthony Nash Jun 13 '23 at 15:42
  • AFAIK, you should just use [Dask](https://www.dask.org/) or [Polars](https://www.pola.rs/) instead of trying to create your own solution using shared memory. I didn't want to learn another library when I wrote this. However, I probably spent more time figuring out how to pass the df around than I would've learning Dask/Polars. – forgetso Jun 13 '23 at 19:05