8

I'm aware of various discussions of limitations of the multiprocessing module when dealing with functions that are data members of a class (due to Pickling problems).

But is there another module, or any sort of work-around in multiprocessing, that allows something specifically like the following (specifically without forcing the definition of the function to be applied in parallel to exist outside of the class)?

class MyClass():

    def __init__(self):
        self.my_args = [1,2,3,4]
        self.output  = {}

    def my_single_function(self, arg):
        return arg**2

    def my_parallelized_function(self):
        # Use map or map_async to map my_single_function onto the
        # list of self.my_args, and append the return values into
        # self.output, using each arg in my_args as the key.

        # The result should make self.output become
        # {1:1, 2:4, 3:9, 4:16}


foo = MyClass()
foo.my_parallelized_function()
print foo.output

Note: I can easily do this by moving my_single_function outside of the class, and passing something like foo.my_args to the map or map_async commands. But this pushes the parallelized execution of the function outside of instances of MyClass.

For my application (parallelizing a large data query that retrieves, joins, and cleans monthly cross-sections of data, and then appends them into a long time-series of such cross-sections), it is very important to have this functionality inside the class since different users of my program will instantiate different instances of the class with different time intervals, different time increments, different sub-sets of data to gather, and so on, that should all be associated with that instance.

Thus, I want the work of parallelizing to also be done by the instance, since it owns all the data relevant to the parallelized query, and it would just be silly to try write some hacky wrapper function that binds to some arguments and lives outside of the class (Especially since such a function would be non-general. It would need all kinds of specifics from inside the class.)

Community
  • 1
  • 1
ely
  • 74,674
  • 34
  • 147
  • 228

3 Answers3

8

Steven Bethard has posted a way to allow methods to be pickled/unpickled. You could use it like this:

import multiprocessing as mp
import copy_reg
import types

def _pickle_method(method):
    # Author: Steven Bethard
    # http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    # Author: Steven Bethard
    # http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

# This call to copy_reg.pickle allows you to pass methods as the first arg to
# mp.Pool methods. If you comment out this line, `pool.map(self.foo, ...)` results in
# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
# __builtin__.instancemethod failed

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class MyClass(object):

    def __init__(self):
        self.my_args = [1,2,3,4]
        self.output  = {}

    def my_single_function(self, arg):
        return arg**2

    def my_parallelized_function(self):
        # Use map or map_async to map my_single_function onto the
        # list of self.my_args, and append the return values into
        # self.output, using each arg in my_args as the key.

        # The result should make self.output become
        # {1:1, 2:4, 3:9, 4:16}
        self.output = dict(zip(self.my_args,
                               pool.map(self.my_single_function, self.my_args)))

Then

pool = mp.Pool()   
foo = MyClass()
foo.my_parallelized_function()

yields

print foo.output
# {1: 1, 2: 4, 3: 9, 4: 16}
unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • 1
    +1 for the method pickling, I was going to suggest Cython to do the multi-threading and avoid GIL, but for what he's trying to do this will be much easier to implement. You might want to mention that the processes have significant communication overhead for small datasets though ( http://stackoverflow.com/questions/11229739/python-multiprocessing-speed ) – Pyrce Jul 30 '12 at 18:15
  • Just to make sure I am understanding, the call to `copy_reg.pickle` is going to define how to handle `MethodType` via the pickle and unpickle functions pass as arguments? Thus, after this has been taken care of, pickling instance methods no longer fails, and parallelization works as you'd expect? – ely Jul 30 '12 at 18:15
  • @Pyrce +1 for the Cython idea; I had considered it, but because it would not be easy to integrate with the existing work on this project, I think it's not the best route for me. But for many scientific algorithms, I expect that (or just using mpi4py which is what I'd prefer) is a good solution. – ely Jul 30 '12 at 18:17
  • @EMS: Yes, `copy_reg.pickle` teaches `pickle` how to pickle/unpickle methods. – unutbu Jul 30 '12 at 18:22
  • Hi guys. I'm wondering if you could help me. I'm having the same problem but somehow this didn't work for me. I post my example here: http://stackoverflow.com/questions/21111106/cant-pickle-multiprocessing-python. Thank you. – pceccon Jan 14 '14 at 11:04
  • The problem with this approach is, that it will pickle an instance of the class which includes the method in question. If this class contains some huge instance attributes, you will probably run out of memory and loose all time advantages of the parallelization. – tymm Oct 07 '14 at 13:00
5

If you use a fork of multiprocessing called pathos.multiprocesssing, you can directly use classes and class methods in multiprocessing's map functions. This is because dill is used instead of pickle or cPickle, and dill can serialize almost anything in python.

pathos.multiprocessing also provides an asynchronous map function… and it can map functions with multiple arguments (e.g. map(math.pow, [1,2,3], [4,5,6]))

See: What can multiprocessing and dill do together?

and: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

So you can do exactly what you wanted to do, I believe.

Python 2.7.8 (default, Jul 13 2014, 02:29:54) 
[GCC 4.2.1 Compatible Apple Clang 4.1 ((tags/Apple/clang-421.11.66))] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import dill
>>> 
>>> class MyClass():
...   def __init__(self):
...     self.my_args = [1,2,3,4]
...     self.output = {}
...   def my_single_function(self, arg):
...     return arg**2
...   def my_parallelized_function(self):
...     res = p.map(self.my_single_function, self.my_args)
...     self.output = dict(zip(self.my_args, res))
... 
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool()
>>> 
>>> foo = MyClass()
>>> foo.my_parallelized_function()
>>> foo.output
{1: 1, 2: 4, 3: 9, 4: 16}
>>>

Get the code here: https://github.com/uqfoundation/pathos

Community
  • 1
  • 1
Mike McKerns
  • 33,715
  • 8
  • 119
  • 139
1

There is a better elegant solution i believe. Add the following line to a code that does multiprocessing with the class and you can still pass the method through the pool. the codes should go above the class

import copy_reg
    import types

    def _reduce_method(meth):
        return (getattr,(meth.__self__,meth.__func__.__name__))
    copy_reg.pickle(types.MethodType,_reduce_method)

for more understanding of how to pickle a method please see below http://docs.python.org/2/library/copy_reg.html

Sri
  • 693
  • 1
  • 9
  • 22
  • This is the basis of what `dill` does for you under the covers, and is utilized by `pathos.multiprocessing` (see my answer). – Mike McKerns Jul 04 '14 at 11:21