4

I am writing a genetic optimization algorithm based on the deap package in python 2.7 (goal is to migrate to python 3 soon). As it is a pretty heavy process, some parts of the optimisation are processed using the multiprocessing package. Here is a summary outline of my program:

  1. Configurations are read in and saved in a config object
  2. Some additional pre-computations are made and saved as well in the config object
  3. The optimisation starts (population is initialized randomly and mutations, crossover is applied to find a better solution) and some parts of it (evaluation function) are executed in multiprocessing
  4. The results are saved

For the evaluation function, we need to have access to some parts of the config object (which after phase 2 stays a constant). Therefore we make it accessible to the different cores using a global (constant) variable:

from deap import base
import multiprocessing

toolbox = base.Toolbox()

def evaluate(ind):
    # compute evaluation using config object
    return(obj1,obj2)

toolbox.register('evaluate',evaluate)

def init_pool_global_vars(self, _config):
    global config
    config = _config

...
# setting up multiprocessing
pool = multiprocessing.Pool(processes=72, initializer=self.init_pool_global_vars,
                                        initargs=[config])
toolbox.register('map', pool.map_async)
...
while tic < max_time:
    # creating new individuals
    # computing in optimisation the objective function on the different individuals
    jobs = toolbox.map(toolbox.evaluate, ind)
    fits = jobs.get()
    # keeping best individuals

We basically make different iterations (big for loop) until a maximum time is reached. I have noticed that if I make the config object bigger (i.e. add big attributes to it, like a big numpy array) even if the code is still same it runs much slower (fewer iterations for the same timespan). So I thought I would make a specific config_multiprocessing object that contains only the attributes needed in the multiprocessing part and pass that as a global variable, but when I run it on 3 cores it is slower than with the big config object and on 72 cores, it is slightly faster, but not much.

What should I do in order to make sure my loops don't suffer in speed from the config object or from any other data manipulations I make before launching the multiprocessing loops?

Running in a Linux docker image on a linux VM in the cloud.

Thomas
  • 1,823
  • 1
  • 8
  • 24
  • Are you running on windows? if so, take a look at [this](https://stackoverflow.com/questions/14749897/python-multiprocessing-memory-usage) – Arthur.V Feb 26 '20 at 09:43
  • No I'm running it in a Linux docker image on a linux VM in the cloud – Thomas Feb 26 '20 at 11:48
  • But I cannot do this as if I would want to call the `multiprocessing.Pool` at the start, I don't have yet my config object created, as such I cannot initialize the different executors with the object as a global variable (as at the beginning, it does not exist yet or not complete yet) – Thomas Feb 26 '20 at 11:54
  • All sub-processes created by `Pool` will inherit the state (memory) from the parent process which gets pickled for transfer. Hence if you make your `config` object bigger, more pickling needs to be done and more memory needs to copied. Since the sub-processes inherit the global state anyway, it's not necessary to use that extra initializer function. When you create your specific `config_multiprocessing`, supposedly derived from `config`, did you `del config` before spawning the pool? Because if not then both objects `config_multiprocessing` and `config` will be copied to the sub-processes. – a_guest Mar 13 '20 at 17:17
  • This is not completely true as when I don't pass config to the multiprocessing pool (i.e. `pool = multiprocessing.Pool(processes=config.nProcesses)`) I have an error saying that config is not defined `NameError: global name 'config' is not defined` I am running inside a linux docker container so there the sub-process created by pool does not inherit the state of the parent! – Thomas Mar 13 '20 at 17:32
  • @Thomas What type is that `config` object? Can you show an example of it and how it is used during the optimization procedure? – a_guest Mar 16 '20 at 11:45
  • @a_guest the config object is an object from a class I created, it contains a lot of diffent attributes such as weights floats, network structure (nested lists that contain other objects (network object, which contains a list with sub-network objects, with constraints and weight integer and floats), it also contains numpy arrays, lists, ... and I access these attributes in the mapped functions: for instance in evaluate, I'll do: `for net in config.networks: for sub_net in net:` etc. – Thomas Mar 17 '20 at 09:46
  • @Thomas Could you provide an example of such an object? I'm trying to understand what is the bottleneck of your application. Could you provide more details about your timing results? When you say "fewer iterations per time span" do you mean only the time for the optimization process (i.e. after the `pool` has been initialized) or do you include the whole script? Could you also provide more details about the `evaluate` function? How long does it take to execute on average? Also some timing numbers regarding initialization, optimization loop, etc. would be helpful to get a better picture. – a_guest Mar 17 '20 at 10:37

1 Answers1

0

The joblib package is designed to handle cases where you have large numpy arrays to distribute to workers with shared memory. This is especially useful if you are treating the data in shared memory as "read-only" like what you describe in your scenario. You can also create writable shared memory as described in the docs.

Your code might look something like:


import os

import numpy as np
from joblib import Parallel, delayed
from joblib import dump, load

folder = './joblib_memmap'
try:
    os.mkdir(folder)
except FileExistsError:
    pass

def evaluate(ind, data):
    # compute evaluation using shared memory data
    return(obj1, obj2)

# just used to initialize memory mapped data
def init_memmap_data(original_data):
    data_filename_memmap = os.path.join(folder, 'data_memmap')
    dump(original_data, data_filename_memmap)
    shared_data = load(data_filename_memmap, mmap_mode='r')
    return shared_data

...
# however you set up indices needs to be changed here
indexes = range(10)  

# however you load your numpy data needs to be done here
shared_data = init_memmap_data(numpy_array_to_share)  

# change n_jobs as appropriate
results = Parallel(n_jobs=2)(delayed(evaluate)(ind, shared_data) for ind in indexes)  

# get index of the maximum as the "best" individual
best_fit_individual = indexes[results.argmax()]

Additionally, joblib supports a threading backend that may be faster than the process based one. It will be easy to test both with joblib.

hume
  • 2,413
  • 19
  • 21
  • Thanks @hume for the answer, however I don't have only numpy arrays, also other types of objects (dictionnaries, nested lists of objects, lists with floats, ...) I guess this still will work? Also what I did not fully understand is what is the advantage of using `joblib` compared to `multiprocessing`? – Thomas Mar 17 '20 at 09:49
  • You can create those config items and pass them to the function or even include them in when the function is defined, and it will work as multiprocessing does. As you noted, there is a slow down for distributing the actual data to each of the workers since that is larger. The advantage of `joblib` is that when used as above it doesn't distribute the data to each of the workers. Instead, it loads the data once into memory and gives each of the workers a reference to it, which they can use for their computations. – hume Mar 17 '20 at 16:05
  • I've tested it, but I get an error saying `AttributeError: Can't get attribute 'Individual' on ` `joblib.externals.loky.process_executor.BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.` but I cannot change the Individual from deap as this is the package I use for the genetical algorithm for optimization – Thomas Mar 18 '20 at 16:03
  • and when changing backend to multiprocessing as suggested [here](https://github.com/joblib/joblib/issues/810#issuecomment-444437271) I get another error when it wants to evaluate the function on the cython part of the code: ` cy_azimuth_full.get(solution_list_filtered, face_ids_filtered, File "cythonized/azimuth_full_Quadratic.pyx", line 4, in ... File "stringsource", line 658, in View.MemoryView.memoryview_cwrapper File "stringsource", line 349, in View.MemoryView.memoryview.__cinit__ ValueError: buffer source array is read-only ` – Thomas Mar 18 '20 at 16:09
  • 1
    I don't have enough of your code to debug it directly, but I reworked this example from the docs https://github.com/DEAP/deap#example with joblib here: https://gist.github.com/pjbull/0d5c5e131ca814ef0242c21380100b33 – hume Mar 19 '20 at 17:19
  • @Thomas did that gist help? If so, happy to edit the answer to include it so it can be accepted. – hume Mar 26 '20 at 19:10
  • hi @hume, I had not yet time to test it, as it would be quite a big change to implement, I will keep you posted, and will try to make time for it in the next week to test it out, if it does help, will definitely let you know! Thanks – Thomas Mar 27 '20 at 09:23