4

I'm using the DEAP library in python for a multi-objective optimization problem. I'd like to use multiple processors for this task; however, I'm running into some trouble.

To give some context, I'm using networkx in conjunction with DEAP I also define the fitness function, crossover, and mutation functions (which I won't show here due to certain reasons).

It says here that all I need to do is to install Scoop and add the lines

from scoop import futures

toolbox.register("map", futures.map)

However I seem to get an error:

scoop._comm.scoopexceptions.ReferenceBroken: 'module' object has no attribute 'Chromosome'

After doing some digging, I found out that I need to move the calls to creator.create in the main module as specified here.

After doing so, I get another error:

scoop._comm.scoopexceptions.ReferenceBroken: This element could not be pickled: FutureId(worker='127.0.0.1:49663', rank=1):partial(<Chromosome representation of a solution here>)=None

I'm not entirely familiar with parallel computing, and I'm not quite sure what it means by "cannot be pickled". The full code can be seen here with some edits:

def genetic(network, creator, no_sensors, sfpd, lambda1, lambda2, lambda3, k):
    locations = network.graph.nodes()
    #move creator.create calls to the main module
    ########################################
    creator.create("FitnessMax", base.Fitness, weights=(lambda1, -lambda2, lambda3)) 
    creator.create("Chromosome", list, fitness=creator.FitnessMax) 
    ########################################

    toolbox = base.Toolbox()
    toolbox.register("attr_item", random.sample, locations, no_sensors)
    toolbox.register("chromosome", tools.initRepeat, creator.Chromosome, toolbox.attr_item, n=1)
    toolbox.register("population", tools.initRepeat, list, toolbox.chromosome)

    toolbox.register("map", futures.map) #######<-- this line ##############

    def evaluate(chromosome):
        #fitness function defined here

    # Crossover
    def crossover(chromosome1, chromosome2): # Uniform Crossover
        #crossover is defined here

    # Mutation
    def mutation(chromosome):
        #mutation is defined here

    toolbox.register("evaluate", evaluate)
    toolbox.register("mate", crossover)
    toolbox.register("mutate", mutation)
    toolbox.register("select", tools.selNSGA2)

    random.seed(64)
    pop = toolbox.population(n=MU)
    hof = tools.ParetoFront()
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", numpy.mean, axis=0)
    stats.register("min", numpy.min, axis=0)
    stats.register("max", numpy.max, axis=0)

    algorithms.eaMuPlusLambda(pop, toolbox, MU, LAMBDA, CXPB, MUTPB, NGEN, stats, halloffame=hof)

    return list(hof)

Thanks, and any insight will be very valuable.

meraxes
  • 541
  • 10
  • 23
  • Could you provide a working minimal example using the built-in map function (without scoop that is)? – Ohjeah Feb 07 '17 at 07:59
  • I'm using the eaMuPlusLambda algorithm which uses the built-in map function. Is there a way to parallelize this? – meraxes Feb 08 '17 at 02:25
  • Without a minimal example you cannot expect much help. Alternatives to scoop are dask.distributed, ipyparallel or multiprocessing(on dill). They all come with pros and cons. – Ohjeah Feb 08 '17 at 06:18
  • Here is an example of the code that I wish to run in parallel: https://github.com/DEAP/deap/blob/a1412d71b50606a7e4e87c3ba538b25603b84266/examples/ga/knapsack.py – meraxes Feb 08 '17 at 07:24

1 Answers1

4

Here is a workaround using joblib and dill.

First: monkeypatch joblib to make it pickle using dill

import dill
from dill import Pickler
import joblib
joblib.parallel.pickle = dill
joblib.pool.dumps = dill.dumps
joblib.pool.Pickler = Pickler

from joblib.pool import CustomizablePicklingQueue
from io import BytesIO
from pickle import HIGHEST_PROTOCOL


class CustomizablePickler(Pickler):
    """Pickler that accepts custom reducers.
    HIGHEST_PROTOCOL is selected by default as this pickler is used
    to pickle ephemeral datastructures for interprocess communication
    hence no backward compatibility is required.
    `reducers` is expected expected to be a dictionary with key/values
    being `(type, callable)` pairs where `callable` is a function that
    give an instance of `type` will return a tuple `(constructor,
    tuple_of_objects)` to rebuild an instance out of the pickled
    `tuple_of_objects` as would return a `__reduce__` method. See the
    standard library documentation on pickling for more details.
    """

    # We override the pure Python pickler as its the only way to be able to
    # customize the dispatch table without side effects in Python 2.6
    # to 3.2. For Python 3.3+ leverage the new dispatch_table
    # feature from http://bugs.python.org/issue14166 that makes it possible
    # to use the C implementation of the Pickler which is faster.

    def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL):
        Pickler.__init__(self, writer, protocol=protocol)
        if reducers is None:
            reducers = {}
        # Make the dispatch registry an instance level attribute instead of
        # a reference to the class dictionary under Python 2
        self.dispatch = Pickler.dispatch.copy()
        for type, reduce_func in reducers.items():
            self.register(type, reduce_func)

    def register(self, type, reduce_func):
        if hasattr(Pickler, 'dispatch'):
            # Python 2 pickler dispatching is not explicitly customizable.
            # Let us use a closure to workaround this limitation.
            def dispatcher(self, obj):
                reduced = reduce_func(obj)
                self.save_reduce(obj=obj, *reduced)
            self.dispatch[type] = dispatcher
        else:
            self.dispatch_table[type] = reduce_func

joblib.pool.CustomizablePickler = CustomizablePickler


def _make_methods(self):
    self._recv = recv = self._reader.recv
    racquire, rrelease = self._rlock.acquire, self._rlock.release

    def get():
        racquire()
        try:
            return recv()
        finally:
            rrelease()

    self.get = get

    def send(obj):
        buffer = BytesIO()
        CustomizablePickler(buffer, self._reducers).dump(obj)
        self._writer.send_bytes(buffer.getvalue())

    self._send = send

    if self._wlock is None:
        # writes to a message oriented win32 pipe are atomic
        self.put = send
    else:
        wlock_acquire, wlock_release = (
            self._wlock.acquire, self._wlock.release)

        def put(obj):
            wlock_acquire()
            try:
                return send(obj)
            finally:
                wlock_release()

        self.put = put

CustomizablePicklingQueue._make_methods = _make_methods

Second:

from joblib import Parallel, delayed

def mymap(f, *iters):
    return Parallel(n_jobs=-1)(delayed(f)(*args) for args in zip(*iters))

And lastly just register the map:

toolbox.register("map", mymap)

It works perfectly with the example you linked. You can integrate dask and joblib to scale this solution out to a cluster. Use dask-drmaa and you pretty much have the same functionality scoop has.

The example code can be found here.

Ohjeah
  • 1,269
  • 18
  • 24
  • Could you give brief me a bit about what's happening here though? I'm not too familiar with parallel computing; all I know is that it helps to speed up processes by running them on multiple cores. How many processes are running parallel and how can I control this? – meraxes Feb 15 '17 at 07:01
  • So the first code block just tells joblib to use dill instead of pickle to do the python object serialization/deserialization. If you want to learn more about joblib, you should have a look at their [documentation](https://pythonhosted.org/joblib/). The number of processes/threads is controlled by the n_jobs parameter. -1 means use all available cores. You will see this parameter in things like sklearn as well. – Ohjeah Feb 15 '17 at 09:09
  • Hey, friends. I try to integrate this parallel part into my deap code. It could use full cpu (99%) but the algorithm never run even 1 iteration. I wait for several mins, it still atcs as this. Do you know why this happen? – Zongze Wu Jan 20 '18 at 18:40