12

I would like to use Pool within a class, but there seems to be a problem. My code is long, I created a small-demo variant to illustrated the problem. It would be great if you can give me a variant of the code below that works.

from multiprocessing import Pool

class SeriesInstance(object):
    def __init__(self):
        self.numbers = [1,2,3]
    def F(self, x):
        return x * x
    def run(self):
        p = Pool()
        print p.map(self.F, self.numbers)


ins = SeriesInstance()
ins.run()

Outputs:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

And then hangs.

Dan D.
  • 73,243
  • 15
  • 104
  • 123
user58925
  • 1,537
  • 5
  • 19
  • 28

4 Answers4

16

It looks like because of the way the function gets passed to the worker threads (pickling) you can't use instance methods unfortunately. My first thought was to use lambdas, but it turns out the built in pickler can't serialize those either. The solution, sadly, is just to use a function in the global namespace. As suggested in other answers, you can use static methods and pass self to make it look more like an instance method.

from multiprocessing import Pool
from itertools import repeat

class SeriesInstance(object):
    def __init__(self):
        self.numbers = [1,2,3]

    def run(self):
        p = Pool()
        squares = p.map(self.F, self.numbers)
        multiples = p.starmap(self.G, zip(repeat(self), [2, 5, 10]))
        return (squares, multiples)

    @staticmethod
    def F(x):
        return x * x

    @staticmethod
    def G(self, m):
        return [m *n for n in self.numbers]

if __name__ == '__main__':
    print(SeriesInstance().run())
Alex Sherman
  • 494
  • 3
  • 9
  • Thanks, but there seems to be a problem. When I use this principle in my larger code it crashes after a few iterations with the following error "OSError: [Errno 35] Resource temporarily unavailable" – user58925 Sep 01 '15 at 20:45
  • 1
    I think the error is due to OS errors when creating too many processes. It seems like you need to properly close your pools when you're done with them, this is just a guess though. Depending on your actual code, which you should provide if you can, you could use a single pool that you pass to run as a parameter or just for every SeriesInstance you must close their pools when you're done with them. – Alex Sherman Sep 01 '15 at 21:22
  • Typically you should limit the size of your pool by "p = mp.Pool(mp.cpu_count())". This works perfectly. – Steve Lihn Jul 01 '21 at 20:28
  • this answer misses the question's idea. what if the function `F` includes class members? – Livne Rosenblum Feb 17 '22 at 12:26
  • It should be noted that the pool isn't properly cleaned up in this example. Instead of `p = Pool()` we should use `with Pool() as p:`. If we intend to call `run()` more than once, then the pool creation should be pulled outside the method so it can be reused instead of created new every time. – Neil Traft Nov 27 '22 at 18:02
4

You can also use multiprocessing with static functions in the class.

stardust
  • 593
  • 7
  • 9
2

You have an error, because pickle can't serialize instancemethod. So you should use this tiny workaround:

from itertools import repeat
from multiprocessing import Pool


class SeriesInstance:
    def __init__(self):
        self.numbers = [1, 2, 3]

    def F(self, x):
        return x * x

    def run(self):
        p = Pool()
        print(list(p.starmap(SeriesInstance.F, zip(repeat(self), self.numbers))))


if __name__ == '__main__':
    SeriesInstance().run()

  • 1
    Welcome to SO! You could improve the quality of your answer with a few explanations. – Timus Oct 30 '20 at 20:36
  • This example will use all cores to speed up computation on the instance method. It works. – Steve Lihn Jul 08 '21 at 16:41
  • The downside of this method is we will serialize the whole class. In a real example, we may have other data members in the class that aren't needed and just add overhead during serialization. For example, in this method each worker receives all `numbers`, instead of just the one number it needs to work on. Otherwise, I like the simplicity of this trick. – Neil Traft Nov 27 '22 at 17:57
  • It should also be noted that the pool isn't properly cleaned up in this example. (It wasn't in the OP either.) – Neil Traft Nov 27 '22 at 18:00
0

There are many posts on stackoverflow about this issue happening for varying reasons. In my case, I was trying to call pool.starmap from inside of a class on another function in the class. Making it a staticmethod or having a function on the outside of the class call it didn't work and gave the same error. A class instance just can't be pickled so we need to create the instance after we start the multiprocessing.

What I ended up doing that worked for me was to separate my class into two classes. Something like this:

from multiprocessing import Pool

class B:
    ...
    def process_feature(idx, feature):
        # do stuff in the new process
        pass
    ...

def multiprocess_feature(process_args):
    b_instance = B()
    return b_instance.process_feature(*process_args)

class A:
    ...
    def process_stuff():
        ...
        with Pool(processes=num_processes, maxtasksperchild=10) as pool:
            results = pool.starmap(
                multiprocess_feature,
                [
                    (idx, feature)
                    for idx, feature in enumerate(features)
                ],
                chunksize=100,
            )
        ...
    ...

...
Akaisteph7
  • 5,034
  • 2
  • 20
  • 43