0

I'm trying to do some computation using a great quantity of data. The computation consists of simple correlation, however, my amount of data is significant and I was staring at my computer for more then 10 minutes with no output at all.

Then I tried to use multiprocessing.Pool. This is my code now:

from multiprocessing import Pool
from haversine import haversine

def calculateCorrelation(data_1, data_2, dist):
    """
    Fill the correlation matrix between data_1 and data_2
    :param data_1: dictionary {key : [coordinates]}
    :param data_2: dictionary {key : [coordinates]}
    :param dist: minimum distance between coordinates to be considered, in kilometers.
    :return: numpy array containing the correlation between each complaint category.
    """
    pool = Pool(processes=20)

    data_1 = collections.OrderedDict(sorted(data_1.items()))
    data_2 = collections.OrderedDict(sorted(data_2.items()))
    data_1_size = len(data_1)                                          
    data_2_size = len(data_2)

    corr = numpy.zeros((data_1_size, data_2_size))

    for index_1, key_1 in enumerate(data_1):
        for index_2, key_2 in enumerate(data_2):  # Forming pairs
            type_1 = data_1[key_1]  # List of data in data_1 of type *i*
            type_2 = data_2[key_2]  # List of data in data_2 of type *j*
            result = pool.apply_async(correlation, args=[type_1, type_2, dist])
            corr[index_1, index_2] = result.get()
    pool.close()
    pool.join()


def correlation(type_1, type_2, dist):
    in_range = 0
    for l1 in type_2:      # Coordinates of a data in data_1
        for l2 in type_2:  # Coordinates of a data in data_2
            p1 = (float(l1[0]), float(l1[1]))
            p2 = (float(l2[0]), float(l2[1]))
            if haversine(p1, p2) <= dist:  # Distance between two data of types *i* and *j*
                in_range += 1              # Number of data in data_2 inside area of data in data_1
        total = float(len(type_1) * len(type_2))
        if total != 0:
            return in_range / total  # Correlation between category *i* and *j*

corr = calculateCorrelation(permiters_per_region, complaints_per_region, 20)

However, speed hasn't improved. It seems that no parallel processing is being done:

enter image description here

As just one thread concentrates almost all work. At some point, all Python workers are using 0.0% of the CPU, and one thread is using 100%.

Am I missing something?

pceccon
  • 9,379
  • 26
  • 82
  • 158

1 Answers1

3

In the loop where you generate the jobs, you call apply_async and then wait for it to complete which effectively serializes the work. You could add the result object to a queue and wait after all the dispatch work is done (see below) or even move to the map method.

def calculateCorrelation(data_1, data_2, dist):
    """
    Fill the correlation matrix between data_1 and data_2
    :param data_1: dictionary {key : [coordinates]}
    :param data_2: dictionary {key : [coordinates]}
    :param dist: minimum distance between coordinates to be considered, in kilometers.
    :return: numpy array containing the correlation between each complaint category.
    """
    pool = Pool(processes=20)
    results = []

    data_1 = collections.OrderedDict(sorted(data_1.items()))
    data_2 = collections.OrderedDict(sorted(data_2.items()))
    data_1_size = len(data_1)                                          
    data_2_size = len(data_2)

    corr = numpy.zeros((data_1_size, data_2_size))

    for index_1, key_1 in enumerate(data_1):
        for index_2, key_2 in enumerate(data_2):  # Forming pairs
            type_1 = data_1[key_1]  # List of data in data_1 of type *i*
            type_2 = data_2[key_2]  # List of data in data_2 of type *j*
            result = pool.apply_async(correlation, args=[type_1, type_2, dist])
            results.append((result, index_1, index_2))
    for result, index_1, index_2 in results:
        corr[index_1, index_2] = result.get()
    pool.close()
    pool.join()
tdelaney
  • 73,364
  • 6
  • 83
  • 116
  • when you refer to "wait for it to complete" is that the `result.get()` call? Is that a blocking call? – Garrett R Mar 02 '16 at 21:10
  • 1
    Yes! That's the problem. `result.get()` blocks until the job is complete then returns the result. – tdelaney Mar 02 '16 at 21:13
  • https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult.get Is it implied, because the docs just say: "Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get()." I'm just curious – Garrett R Mar 02 '16 at 21:17
  • I should have noted that. I followed an answer of a guy with > 30k of reputation and though it was correct (http://stackoverflow.com/questions/25888255/how-to-use-python-multiprocessing-pool-map-to-fill-numpy-array-in-a-for-loop) – pceccon Mar 02 '16 at 23:51