10

I am working on a CPU intensive ML problem which is centered around an additive model. Since addition is the main operation I can divide the input data into pieces and spawn multiple models which are then merged by the overriden __add__ method.

The code relating to the multiprocessing looks like this:

def pool_worker(filename, doshuffle):
    print(f"Processing file: {filename}")
    with open(filename, 'r') as f:
        partial = FragmentModel(order=args.order, indata=f, shuffle=doshuffle)
        return partial

def generateModel(is_mock=False, save=True):
    model = None
    with ThreadPool(args.nthreads) as pool:
        from functools import partial
        partial_models = pool.imap_unordered(partial(pool_worker, doshuffle=is_mock), args.input)
        i = 0
        for m in partial_models:
            logger.info(f'Starting to merge model {i}')
            if model is None:
                import copy
                model = copy.deepcopy(m)
            else:
                model += m
            logger.info(f'Done merging...')
            i += 1

    return model

The issue is that the memory consumption scales exponentially as the model order increases, so at order 4 each instance of the model is about 4-5 GB, which causes the threadpool to crash as the intermediate model objects are then not pickleable.

I read about this a bit and it appears as even if the pickling is not an issue, it's still extremely inefficient to pass data like this, as commented to this answer.

There is very little guidance as to how one can use shared memory for this purpose, however. Is it possible to avoid this problem without having to change the internals of the model object?

posdef
  • 6,498
  • 11
  • 46
  • 94
  • This answers how to share data between processes using shared memory and turn off pickling: https://stackoverflow.com/a/14135569/9521723 – SimonF Jan 03 '19 at 11:03
  • @SimonF there is a crucial difference between the questions, the one you linked refers to child processes referencing (i.e. reading but not writing) large objects. In my case, I want to **return** large objects, my child processes get their input data independently of each other. – posdef Jan 03 '19 at 12:18
  • Use the multiprocessing module. Read its documentation to know how to do it. – LtWorf Jan 03 '19 at 14:11
  • Using file as @Dima_Tisnek suggested is the right option. Cloud services commonly store large data in file format, especially when your single chunk hits GB. Merge can be done after all chunks dumped to filesystem. – knh190 Jan 10 '19 at 03:42
  • @knh190 The issue is that the large numpy arrays are all variables in custom objects – posdef Jan 10 '19 at 13:59

4 Answers4

13

Since Python 3.8, there is multiprocessing.shared_memory that enables direct memory sharing between processes, similar to "real" multi-threading in C or Java. Direct memory sharing can be significantly faster than sharing via files, sockets, or data copy serialization/deserialization.

It works by providing a shared memory buffer on which different processes can claim and declare variables, via a basic SharedMemory class instance or a more advanced SharedMemoryManager class instance. Variables in basic python data types can be conveniently declared using the built-in ShareableList. Variables in advanced data types such as numpy.ndarray can be shared by specifying the memory buffer when declaring.

Example with numpy.ndarray:

import numpy as np
from multiprocessing import shared_memory

# setting up
data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
d_shape = (len(data),)
d_type = np.int64
d_size = np.dtype(d_type).itemsize * np.prod(d_shape)

# IN THE MAIN PROCESS
# allocate new shared memory
shm = shared_memory.SharedMemory(create=True, size=d_size)
shm_name = shm.name
# numpy array on shared memory buffer
a = np.ndarray(shape=d_shape, dtype=d_type, buffer=shm.buf)
# copy data into shared memory ndarray once
a[:] = data[:]

# IN ANOTHER PROCESS
# reuse existing shared memory
ex_shm = shared_memory.SharedMemory(name=shm_name)
# numpy array b uses the same memory buffer as a
b = np.ndarray(shape=d_shape, dtype=d_type, buffer=ex_shm.buf)
# changes in b will be reflected in a and vice versa...
ex_shm.close()  # close after using

# IN THE MAIN PROCESS
shm.close()  # close after using
shm.unlink()  # free memory

In the above code, a and b arrays use the same underlying memory and can update the same memory directly, which can be very useful in machine learning. However, you should beware of the concurrent update problems and decide how to handle them, either by using Lock/partitioned accesses/or accept stochastic updates (the so-called HogWild style).

THN
  • 3,351
  • 3
  • 26
  • 40
7

Use files!

No, really, use files -- they are are efficient (OS will cache the content), and allow you to work on much larger problems (data set doesn't have to fit into RAM).

Use any of https://docs.scipy.org/doc/numpy-1.15.0/reference/routines.io.html to dump/load numpy arrays to/from files and only pass file names between the processes.

P.S. benchmark serialisation methods, depending on the intermediate array size, the fastest could be "raw" (no conversion overhead) or "compressed" (if file ends up being written to disk) or something else. IIRC loading "raw" files may require knowing data format (dimensions, sizes) in advance.

Dima Tisnek
  • 11,241
  • 4
  • 68
  • 120
  • 1
    How would that work if the numpy arrays in question are variables in a custom object? – posdef Jan 10 '19 at 12:27
  • 1
    Instead of files, you could use memory maps. Python has the [`mmap` module](https://docs.python.org/3.0/library/mmap.html) and numpy has the memmap module ([example](https://stackoverflow.com/questions/16149803/working-with-big-data-in-python-and-numpy-not-enough-ram-how-to-save-partial-r)). – FThompson Jan 10 '19 at 19:29
  • @posdef Start by writing the numpy arrays from the object to a file or mmap. Then, provide each subprocess or thread with the section (the offset) of the file/mmap that it is responsible for. With something like `mmap`/`numpy.memmap`, only the indices accessed are loaded into memory. – FThompson Jan 10 '19 at 19:31
  • 1
    @posdef you'll have to **save** and **load** these objects; given that you've overridden the `__add__` method, I assume the implementation is under your control. If there's only one `ndarray` per fragment, that's straightforward. If there's more, consider `pandas` for convenience or hack something custom for speed. – Dima Tisnek Jan 11 '19 at 01:11
  • @Vulcan is there demo for using mmap on numpy.array? In docs it says mmap is appropriate for strings. However, numpy arrays can also be dumped to file using `pickle` or `hdf5` for even larger dataset. See: 1. http://docs.h5py.org 2. https://docs.python.org/3/library/pickle.html – knh190 Jan 11 '19 at 03:12
  • 1
    @knh190 I'm unfamiliar with these libraries but here's the documentation, containing a numpy.memmap example using an array: https://docs.scipy.org/doc/numpy/reference/generated/numpy.memmap.html#numpy.memmap – FThompson Jan 11 '19 at 06:23
2

Check out the ray project which is a distributed execution framework that makes use of apache arrow for serialization. It's especially great if you're working with numpy arrays and hence is a great tool for ML workflows.

Here's a snippet from the docs on object serialization

In Ray, we optimize for numpy arrays by using the Apache Arrow data format. When we deserialize a list of numpy arrays from the object store, we still create a Python list of numpy array objects. However, rather than copy each numpy array, each numpy array object holds a pointer to the relevant array held in shared memory. There are some advantages to this form of serialization.

  • Deserialization can be very fast.
  • Memory is shared between processes so worker processes can all read the same data without having to copy it.

In my opinion it's even easier to use than the multiprocessing library for parallel execution especially when looking to use shared memory, intro to usage in the tutorial.

MarkAWard
  • 1,699
  • 2
  • 16
  • 28
0

You should use Manager proxy object for shared editable objects: https://docs.python.org/3/library/multiprocessing.html#multiprocessing-managers The access lock would be handled by that Manager proxy object.

In Customized managers section there is an example, that should suit you:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

After that you have to connect from different processes (as shown in using a remote manager) to that manager and edit it as you wish.

Sindbag
  • 331
  • 3
  • 15