11

I'm applying some parallelization to my code, in which I use classes. I knew that is not possible to pickle a class method without any other approach different of what Python provides. I found a solution here.

In my code, I have two parts that should be parallelized, both using class. Here, I'm posting a very simple code just representing the structure of mine (is the same, but I deleted the methods content, which was a lot of math calculus, insignificant for the output that I'm getting).

The problem is while I can pickle one method (shepard_interpolation), with the other one (calculate_orientation_uncertainty) I got the pickle error. I don't know why this is happing, or why it works partly.

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    if func_name.startswith('__') and not func_name.endswith('__'): #deal with mangled names
        cls_name = cls.__name__.lstrip('_')
        func_name = '_' + cls_name + func_name
    print cls
    return _unpickle_method, (func_name, obj, cls)


def _unpickle_method(func_name, obj, cls):
    for cls in cls.__mro__:
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

class ImageData(object):

    def __init__(self, width=60, height=60):
        self.width = width
        self.height = height
        self.data = []
        for i in range(width):
            self.data.append([0] * height)
            
    def shepard_interpolation(self, seeds=20):
        print "ImD - Sucess"       

import copy_reg
import types
from itertools import product
from multiprocessing import Pool

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class VariabilityOfGradients(object):
    def __init__(self):
        pass
        
    @staticmethod
    def aux():
        return "VoG - Sucess" 
            
    @staticmethod
    def calculate_orientation_uncertainty():
        results = []
        pool = Pool()
        for x, y in product(range(1, 5), range(1, 5)):
            result = pool.apply_async(VariabilityOfGradients.aux) 
        results.append(result.get())
        pool.close()
        pool.join()        

        
if __name__ == '__main__':  
    results = []
    pool = Pool()
    for _ in range(3):
        result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()])
        results.append(result.get())
    pool.close()
    pool.join()
    
    VariabilityOfGradients.calculate_orientation_uncertainty()   

 

When running, I got

PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

And this is almost the same found here. The only difference that I see is that my methods are static.

I noticed that in my calculate_orientation_uncertainty, when I call the function as result = pool.apply_async(VariabilityOfGradients.aux()), i.e., with the parenthesis (in the doc examples I never saw this), it seems to work. But, when I try to get the result, I receive

TypeError: 'int' object is not callable

How can I do this correctly?

mkrieger1
  • 19,194
  • 5
  • 54
  • 65
pceccon
  • 9,379
  • 26
  • 82
  • 158
  • If this isn't your code, people will most likely ask you to post your exact code that you are having trouble with. Just a notice. Also which Python **version** are you using, the pickle behaves a bit different in the version and i for one have no problems pickling a class object as long as it doesn't contain instances such as sockets or active threads. Usually you store a threaded class in two class objects, one which might inherit the other or simply have one class initated within the threaded class and simply only thread the "inside" class if that makes sense. – Torxed Jan 14 '14 at 10:46
  • This is my code. I just delete what was inside the methods (insignificant for the problem, a lot of math calculus that is unnecessary). – pceccon Jan 14 '14 at 10:56
  • What is the error message? – Mailerdaimon Jan 14 '14 at 10:59
  • The classic "PicklingError: Can't pickle : attribute lookup __builtin__.function failed" – pceccon Jan 14 '14 at 11:02
  • "I can pickle one method, but with the other one I got the pickle error" --which one? – Janne Karila Jan 14 '14 at 11:34
  • I edited, @JanneKarila – pceccon Jan 14 '14 at 11:55
  • 1
    @pceccon: Regarding your edit: If you use `pool.apply_async(VariabilityOfGradients.aux())` then you are calling `VariabilityOfGradients.aux()` first, then sending its return value to `pool.apply_async`. This is not what you want because the pool worker is not calling `VariabilityOfGradients.aux` concurrently. – unutbu Jan 14 '14 at 11:59
  • 3
    Regarding the original problem: `isinstance(func, types.MethodType)` is `False` when `func` is a staticmethod, so `copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)` does not affect staticmethods. The easy workaround is to make your staticmethod a plain function at the global module level. – unutbu Jan 14 '14 at 12:01
  • Yes, this is an easy solution, @unutbu. I just was trying to find another solution 'cause in my mind, due to the project size, I think that it was better to work with classes and so on. – pceccon Jan 14 '14 at 12:04
  • But I didn't know about that thing that you explained about static methods and copy_reg. Thank you. (: – pceccon Jan 14 '14 at 12:06

3 Answers3

11

You could define a plain function at the module level and a staticmethod as well. This preserves the calling syntax, introspection and inheritability features of a staticmethod, while avoiding the pickling problem:

def aux():
    return "VoG - Sucess" 

class VariabilityOfGradients(object):
    aux = staticmethod(aux)

For example,

import copy_reg
import types
from itertools import product
import multiprocessing as mp

def _pickle_method(method):
    """
    Author: Steven Bethard (author of argparse)
    http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
    """
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)


def _unpickle_method(func_name, obj, cls):
    """
    Author: Steven Bethard
    http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
    """
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class ImageData(object):

    def __init__(self, width=60, height=60):
        self.width = width
        self.height = height
        self.data = []
        for i in range(width):
            self.data.append([0] * height)

    def shepard_interpolation(self, seeds=20):
        print "ImD - Success"       

def aux():
    return "VoG - Sucess" 

class VariabilityOfGradients(object):
    aux = staticmethod(aux)

    @staticmethod
    def calculate_orientation_uncertainty():
        pool = mp.Pool()
        results = []
        for x, y in product(range(1, 5), range(1, 5)):
            # result = pool.apply_async(aux) # this works too
            result = pool.apply_async(VariabilityOfGradients.aux, callback=results.append)
        pool.close()
        pool.join()
        print(results)


if __name__ == '__main__':  
    results = []
    pool = mp.Pool()
    for _ in range(3):
        result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()])
        results.append(result.get())
    pool.close()
    pool.join()

    VariabilityOfGradients.calculate_orientation_uncertainty()   

yields

ImD - Success
ImD - Success
ImD - Success
['VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess']

By the way, result.get() blocks the calling process until the function called by pool.apply_async (e.g. ImageData.shepard_interpolation) is completed. So

for _ in range(3):
    result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()])
    results.append(result.get())

is really calling ImageData.shepard_interpolation sequentially, defeating the purpose of the pool.

Instead you could use

for _ in range(3):
    pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()],
                     callback=results.append)

The callback function (e.g. results.append) is called in a thread of the calling process when the function is completed. It is sent one argument -- the return value of the function. Thus nothing blocks the three pool.apply_async calls from being made quickly, and the work done by the three calls to ImageData.shepard_interpolation will be performed concurrently.

Alternatively, it might be simpler to just use pool.map here.

results = pool.map(ImageData.shepard_interpolation, [ImageData()]*3)
unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • I knew about this, but I just got wondering if there was a way to do what I would like in Python, whiteout this approaches. But it seems that I can't. Rs. – pceccon Jan 14 '14 at 14:28
  • Hi, @unutbu. Could I ask you one more thing? It worked, but, as you see, in this example I iterate 25 times. In my problem, I have two for's of 200 x 200, and my aux function has a lot of math processing. What is happening is that calling this parallelized is taking more time than not using multiprocessing at all. Do you know why? Am I applying the concept correct in this example? Thank you. – pceccon Jan 15 '14 at 11:02
  • Could you post the code in question? Runnable code would help a lot. – unutbu Jan 15 '14 at 13:10
7

If you use a fork of multiprocessing called pathos.multiprocesssing, you can directly use classes and class methods in multiprocessing's map functions. This is because dill is used instead of pickle or cPickle, and dill can serialize almost anything in python.

pathos.multiprocessing also provides an asynchronous map function… and it can map functions with multiple arguments (e.g. map(math.pow, [1,2,3], [4,5,6]))

See: What can multiprocessing and dill do together?

and: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

Get the code here: https://github.com/uqfoundation/pathos

pathos also has an asynchronous map (amap), as well as imap.

Community
  • 1
  • 1
Mike McKerns
  • 33,715
  • 8
  • 119
  • 139
0

I'm not sure if this is what you are looking for but my use was slightly different. I wanted to use a method from a class within the same class running on multiple threads.

This is how I implemented it:

from multiprocessing import Pool

class Product(object):

        def __init__(self):
                self.logger = "test"

        def f(self, x):
                print(self.logger)
                return x*x

        def multi(self):
                p = Pool(5)
                print(p.starmap(Product.f, [(Product(), 1), (Product(), 2), (Product(), 3)]))


if __name__ == '__main__':
        obj = Product()
        obj.multi()