3

I'm working on a scientific project where I have a method that takes much time to terminates and which is call more than 20 times. That method could be easily parallelized too. The problem is that the parallelized code is taking much more time than the not parallelized one (commented in the code).

Here is a piece of my code just to show how I am doing such thing:

import copy_reg
import types
from itertools import product
import multiprocessing as mp

def _pickle_method(method):
    """
    Author: Steven Bethard (author of argparse)
    http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
    """
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)


def _unpickle_method(func_name, obj, cls):
    """
    Author: Steven Bethard
    http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
    """
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class ImageData(object):

    def __init__(self, width=60, height=60):
        self.width = width
        self.height = height
        self.data = []
        for i in range(width):
            self.data.append([0] * height)      

def parallel_orientation_uncertainty_calculus(x, y, mean_gradient, mean_gradient_direction, gradient_covariance,
                                               gradient_correlation, bins):
    v = mean_gradient_direction.data[x][y]
    theta_sigma = Uts.Utils.translate_to_polar_coordinates(v[0].item(0), v[1].item(0))
    sigma_theta = 0.0
    for i in range(bins):
        n1 = mt.pow(-mt.pi / 2 + mt.pi * i / bins, 2)
        n2 = VariabilityOfGradients.calculate_gradient_orientation_probability_density_function(
            mean_gradient, gradient_covariance, gradient_correlation, x, y,
            (theta_sigma - mt.pi / 2 + mt.pi * i / bins))
        sigma_theta += n1 * n2
    return [x, y, sigma_theta]

class VariabilityOfGradients(object):
    parallel_orientation_uncertainty_calculus = staticmethod(parallel_orientation_uncertainty_calculus)

    @staticmethod
    def calculate_orientation_uncertainty(mean_gradient, mean_gradient_direction, gradient_covariance, gradient_correlation, bins):
        output = ImD.ImageData(range_min=0, range_max=1)

        results = []
        pool = Pool()
        for x, y in product(range(1, output.width - 1), range(1, output.height - 1)):
            print "Iteration ", x, y
            result = pool.apply_async(VariabilityOfGradients.parallel_orientation_uncertainty_calculus,
                                  args=[x, y, mean_gradient, mean_gradient_direction, gradient_covariance,
                                        gradient_correlation, bins])
            results.append(result.get())
        pool.close()
        pool.join()        
        for i, result in enumerate(results):
            result = results[i]
            print result
            output.data[result[0], result[1]] = result[2]

        # for x, y in product(range(1, output.width - 1), range(1, output.height - 1)):
        #     print "Iteration ", x, y
        #     v = mean_gradient_direction.data[x][y]
        #     theta_sigma = Uts.Utils.translate_to_polar_coordinates(v[0].item(0), v[1].item(0))
        #     sigma_theta = 0.0
        #     for i in range(bins):
        #         n1 = mt.pow(-mt.pi / 2 + mt.pi * i / bins, 2)
        #         n2 = VariabilityOfGradients.calculate_gradient_orientation_probability_density_function(
                mean_gradient, gradient_covariance, gradient_correlation, x, y,
                (theta_sigma - mt.pi / 2 + mt.pi * i / bins))
        #         sigma_theta += n1 * n2
        #     output.data[x][y] = sigma_theta

        return output    

if __name__ == '__main__':  
    VariabilityOfGradients.calculate_orientation_uncertainty()  

I'm wondering what I'm doing wrong. Am I using multiprocessing wrong?

Thank you in advance.

pceccon
  • 9,379
  • 26
  • 82
  • 158
  • How many cores do you have on your machine? If you only have one core then making that core switch between multiple processes is going to be slower than just running one process. My desktop in front of me has 4 cores so can run 4 processes in a truly parallel fashion. – aychedee Jan 15 '14 at 11:45
  • 1
    Two, I have a Core 2 Duo. – pceccon Jan 15 '14 at 11:49
  • 4
    @pceccon The code waits (result.get()) for each pool task to complete before adding another one. – jwalker Jan 15 '14 at 12:36
  • Oh, thank you, @jwalker. Is there a way to get the result without this? – pceccon Jan 15 '14 at 12:40
  • 3
    @pceccon : results.append(result), then results = map( lambda x: x.get(), results) after the pool.join() – lucasg Jan 15 '14 at 12:47
  • 1
    http://stackoverflow.com/questions/8533318/python-multiprocessing-pool-when-to-use-apply-apply-async-or-map – jwalker Jan 15 '14 at 12:52
  • It's pretty weird @georgesl. It never ends here... Still taking more time (since it doesn't finished until now) than no multiprocessing at al.. Oo – pceccon Jan 15 '14 at 17:02
  • From my tests, it seems to be stuck between the join and the get. – pceccon Jan 15 '14 at 17:45
  • 1
    @georgesl, I put a print just before the pool.join() and before your suggestion and it never reaches this print statement. This is pretty weird, 'cause the problem not seems to be in the get. It seems to be stuck at close() / join() – pceccon Jan 15 '14 at 23:00

0 Answers0