8

I have an alogirithm that I am trying to parallelize, because of very long run times in serial. However, the function that needs to be parallelized is inside a class. multiprocessing.Pool seems to be the best and fastest way to do this, but there is a problem. It's target function can not be a function of an object instance. Meaning this; you declare a Pool in the following way:

import multiprocessing as mp
cpus = mp.cpu_count()
poolCount = cpus*2
pool = mp.Pool(processes = poolCount, maxtasksperchild = 2)

And then actually use it as so:

pool.map(self.TargetFunction, args)

But this throws an error, because object instances cannot be pickled, as the Pool function does to pass information to all of its child processes. But I have to use self.TargetFunction

So I had an idea, I would create a new Python file named parallel and simply write a couple of functions without putting them in a class, and call those functions from within my original class (of whose function I want to parallelize)

So I tried this:

import multiprocessing as mp

def MatrixHelper(args):
    WM = args[0][0]
    print(WM.CreateMatrixMp(*args))
    return WM.CreateMatrixMp(*args)

def Start(sigmaI, sigmaX, numPixels, WM):

    cpus = mp.cpu_count()
    poolCount = cpus * 2
    args = [(WM, sigmaI, sigmaX, i) for i in range(numPixels)]
    print('Number of cpu\'s to process WM:%d'%cpus)

    pool = mp.Pool(processes = poolCount, maxtasksperchild = 2)
    tempData = pool.map(MatrixHelper, args)

    return tempData

These functions are not part of a class, using MatrixHelper in Pools map function works fine. But I realized while doing this that it was no way out. The function in need of parallelization (CreateMatrixMp) expects an object to be passed to it (it is declared as def CreateMatrixMp(self, sigmaI, sigmaX, i))

Since it is not being called from within its class, it doesn't get a self passed to it. To solve this, I passed the Start funtion the calling object itself. As in, I say parallel.Start(sigmaI, sigmaX, self.numPixels, self). The object self then becomes WM so that I will be able to finally call the desired function as WM.CreateMatrixMp().

I'm sure that that is a very sloppy way of coding, but I just wanted to see if it would work. But nope, pickling error again, the map function cannot handle any objects instances at all.

So my question is, why is it designed this way? It seems useless, it seems to be completely disfunctional in any program that uses classes at all.

I tried using Process rather than Pool, but this requires the array that I am ultimately writing to to be shared, which requires processes waiting for eachother. If I don't want it to be shared, then I have each process write its own smaller array, and do one big write at the end. But both of these result in slower run times than when I was doing this serially! Pythons builtin multiprocessing seems absolutely useless!

Can someone please give me some guidance as to how to actually save time with multiprocessing, in the context of my tagret function being inside a class? I have read on posts here to use pathos.multiprocessing instead, but I am on Windows, and am working on this project with multiple people who all have different set ups. Having everyone try to install it would be inconveinient.

pretzlstyle
  • 2,774
  • 5
  • 23
  • 40

4 Answers4

5

I was having a similar issue with trying to use multiprocessing within a class. I was able to solve it with a relatively easy workaround I found online. Basically you use a function outside of your class that unwraps/unpacks the method inside your function that you're trying to parallelize. Here are the two websites I found that explain how to do it.

Website 1 (joblib example)

Website 2 (multiprocessing module example)

For both, the idea is to do something like this:

rom multiprocessing import Pool
import time
 
def unwrap_self_f(arg, **kwarg):
    return C.f(*arg, **kwarg)
 
class C:
    def f(self, name):
        print 'hello %s,'%name
        time.sleep(5)
        print 'nice to meet you.'
     
    def run(self):
        pool = Pool(processes=2)
        names = ('frank', 'justin', 'osi', 'thomas')
        pool.map(unwrap_self_f, zip([self]*len(names), names))
 
if __name__ == '__main__':
    c = C()
    c.run()
Akaisteph7
  • 5,034
  • 2
  • 20
  • 43
DataMan
  • 3,115
  • 6
  • 21
  • 36
4

The essence of how multiprocessing works is that it spawns sub-processes that receive parameters to run a certain function. In order to pass these arguments, it needs that they are, well, passable: non-exclusive to the main process, s.a. sockets, file descriptors and other low-level, OS related stuff.

This translates into "need to be pickleable or serializable".

On the same topic, parallel processing works best when you (can) have self-contained divisions of a problem. I can tell you want to share some sort of input/stream/database source, but this will probably create a bottleneck that you'll have to tackle at some point (at least, from the "python script" side, rather than the "OS/database" side. Fortunately, you have to tackle it early now.

You can re-code your classes to spawn/create these non-pickable resources when neeeded rather than at start

def targetFunction(self, range_params):
  if not self.ready():
    self._init_source()
  #rest of the code

You kinda tackled the problem the other way around (initialized an object based on params). And yes, parallel processing comes with a cost.

You can see the multiprocessing programming guidelines for an even more thorough insight on this matter.

Felipe Lema
  • 2,700
  • 12
  • 19
0

this is an old post but it still is one of the top results when you search for the topic. Some good info for this question can be found at this stack overflow: python subclassing multiprocessing.Process

Howard
  • 87
  • 1
  • 12
0

I tried some workarounds to try calling pool.starmap from inside of a class to another function in the class. Making it a staticmethod or having a function on the outside call it didn't work and gave the same error. A class instance just can't be pickled so we need to create the instance after we start the multiprocessing.

What I ended up doing that worked for me was to separate my class into two classes. Basically, the function you are calling the multiprocessing on needs to be called right after you instantiate a new object for the class it belongs to.

Something like this:

from multiprocessing import Pool

class B:
    ...
    def process_feature(idx, feature):
        # do stuff in the new process
        pass
    ...

def multiprocess_feature(process_args):
    b_instance = B()
    return b_instance.process_feature(*process_args)

class A:
    ...
    def process_stuff():
        ...
        with Pool(processes=num_processes, maxtasksperchild=10) as pool:
            results = pool.starmap(
                multiprocess_feature,
                [
                    (idx, feature)
                    for idx, feature in enumerate(features)
                ],
                chunksize=100,
            )
        ...
    ...

...
Akaisteph7
  • 5,034
  • 2
  • 20
  • 43