1

I've created a program, which can be sum up to something like this:

from itertools import combinations
class Test(object):
    def __init__(self, t2):
        self.another_class_object = t2

def function_1(self,n):
   a = 2
   while(a <= n):
       all_combs = combinations(range(n),a)
       for comb in all_combs:
           if(another_class_object.function_2(comb)):
              return 1
       a += 1
   return -1

Function combinations is imported from itertools. Function_2 returns True or False depending on the input and is a method in another class object, e.g.:

class Test_2(object):

def __init__(self, list):
    self.comb_list = list

def function_2(self,c):
    return c in self.comb_list

Everything is working just fine. But now I want to change it a little bit and implement multiprocessing. I found this topic that shows an example of how to exit the script when one of the worker process determines no more work needs to be done. So I made following changes:

  1. added a definition of pool into __init__ method: self.pool = Pool(processes=8)
  2. created a callback function:

    all_results = []
    def callback_function(self, result):
        self.all_results.append(result)
        if(result):
            self.pool.terminate()
    
  3. changed function_1:

    def function_1(self,n):
        a = 2
        while(a <= n):
           all_combs = combinations(range(n),a)
           for comb in all_combs:
               self.pool.apply_async(self.another_class_object.function_2, args=comb, callback=self.callback_function)
           #self.pool.close()
           #self.pool.join()
           if(True in all_results):
               return 1
           a += 1
       return -1
    

Unfortunately, it does not work as I expected. Why? After debugging it looks like the callback function is never reached. I thought that it would be reached by every worker. Am I wrong? What can be the problem?

Qiu
  • 5,651
  • 10
  • 49
  • 56
  • For starters don't pass instance methods to your `multiprocessing.Pool` facilities (or multiprocessing code in general) as that's one bag of hurt you don't want to open. – zwer Jul 14 '17 at 11:43

2 Answers2

1

I did not try your code as such, but I tried your structure. Are you sure the problem is in callback function and not the worker function? I did not manage to get apply_async launch a single instance of the worker function if the function was a class method. It just did not do anything. Apply_async completes without error but it does not implement the worker.

As soon as I moved the worker function (in your case another_class_object.function2) as a standalone global function outside classes, it started working as expected and the callback was triggered normally. The callback function, in contrast, seems to work fine as a class method.

There seems to be discussion about this for example here: Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?

Is this in any way useful?

Hannu

Hannu
  • 11,685
  • 4
  • 35
  • 51
1

Question: ... not work as I expected. ... What can be the problem?

It's always necessary to get() the Results from pool.apply_async(... to see the Errors from the Pool Processes.

Change to the following:

pp = []
for comb in all_combs:
    pp.append(pool.apply_async(func=self.another_class_object.function_2, args=comb, callback=self.callback_function))

pool.close()

for ar in pp:
    print('ar=%s' % ar.get())

And you will see this Error:

TypeError: function_2() takes 2 positional arguments but 3 were given

Fix for this Error, change args=comb to args=(comb,):

pp.append(pool.apply_async(func=self.another_class_object.function_2, args=(comb,), callback=self.callback_function))

Tested with Python: 3.4.2

stovfl
  • 14,998
  • 7
  • 24
  • 51