0

I have a number of pickled pandas dataframes with a decent number of rows in each (~10k). One of the columns of the dataframe is a numpy ndarray of floats (Yes, I specifically chose to store array data inside a single cell - I've read this may not usually be the right way to go, eg. here , but in this case the individual values are meaningless, only the full list of values has meaning taken together, so I think it makes sense in this case). I need to calculate the euclidean distance between each pair of rows in the frame. I have working code for this, but I am hoping I can do something to improve the performance of it, as right now it is telling me that my smaller dataset is going to take > a month, but I'm pretty sure it's gonna take all my memory long before then.

The code is as follows:

import pandas as pd
import sys
import getopt
import math
from scipy.spatial import distance
from timeit import default_timer as timer
from datetime import timedelta

id_column_1 = 'id1'
id_column_2 = 'id2'
distance_column = 'distance'
val_column = 'val'

# where n is the size of the set
# and k is the number of elements per combination
def combination_count(n, k):
    if k > n:
        return 0
    else:
        # n! / (k! * (n - k)!)
        return math.factorial(n)/(math.factorial(k) * math.factorial(n - k))

def progress(start, current, total, id1, id2):
    if current == 0:
        print('Processing combination #%d of #%d, (%d, %d)' % (current, total, id1, id2))
    else:
        percent_complete = 100 * float(current)/float(total)
        elapsed_time = timer() - start
        avg_time = elapsed_time / current
        remaining = total - current
        remaining_time = timedelta(seconds=remaining * avg_time)
        print('Processing combination #%d of #%d, (%d, %d). %.2f%% complete, ~%.2f s/combination, ~%s remaining' % (current, total, id1, id2, percent_complete, avg_time, remaining_time))

def check_distances(df):
    indexes = df.index
    total_combinations = combination_count(len(indexes), 2)
    current_combination = 0
    print('There are %d possible inter-message relationships to compute' % total_combinations)
    distances = pd.DataFrame(columns=[id_column_1, id_column_2, distance_column])
    distances.set_index([id_column_1, id_column_2], inplace=True)
    start = timer()
    for id1 in indexes:
        for id2 in indexes:
            # id1 is always < id2
            if id1 >= id2:
                continue
            progress(start, current_combination, total_combinations, id1, id2)
            distances.loc[(id1, id2), distance_column] = distance.euclidean(df.loc[id1, embeddings_column], df.loc[id2, embeddings_column])
            current_combination+=1

(I excluded the main() function which just pulls out args and loads in the pickled files based on them)

I've only really started working with Python recently for this task, so there's every chance I'm missing something simple, is there a good way to deal with this?

Chris O'Kelly
  • 1,863
  • 2
  • 18
  • 35

2 Answers2

1

There are some options for parallel calculation of dataframes in pure python.
Most complete may be dask
A simpler, but easier option would be pandaral-lel

edinho
  • 406
  • 2
  • 6
  • Hi, thanks for answering - as I look at the docs for pandaral-lel, it doesn't seem like something that can be of help here, because the computation being done is not directly on the cell values themselves, but based on the indexes of the cells, which are not available to the function passed to `apply()`/`applymap()` - or am I missing something here? – Chris O'Kelly Apr 04 '19 at 04:52
  • Actually reading through dask it seems like the same applies there too – Chris O'Kelly Apr 04 '19 at 05:00
  • You need to calculate, for each row, a distance to all other rows. You can use an apply, as, for each row there will be N-1 (N = size of dataset) outputs, you just need how to think how to store it (and, yes, you may need to change your code and remove index). If it still too difficult, you can use multiprocessing lib, make a pool and do it by hand (but you would still need to change some of your code) – edinho Apr 04 '19 at 06:15
  • Yeah actually I've ended up using joblib in the meantime and it had a HUGE impact (I think it uses the multiprocessing lib under the hood in the way I configured it) – Chris O'Kelly Apr 04 '19 at 06:27
1

So the solution ended up being parallelization, but I was unable to figure this out using the Panda specific parallelization libs seeing as the intended result was not a transformation of the existing cell contents, but a new value derived from another dataframe.

I grabbed the joblib library and took the following steps:

first, I created a function that, given two ids, could return the row for that index (as the separate workers cannot mutate the dataframe in the main process, we instead have to move to a paradigm of generating all the data first, THEN building the dataframe):

def get_distance(df, id1, id2):
    return [id1, id2, distance.euclidean(df.loc[id1, embeddings_column], df.loc[id2, embeddings_column])]

and applied joblib parallelization to it:

def get_distances(df):
    indexes = df.index
    total_combinations = combination_count(len(indexes), 2)
    current_combination = 0
    print('There are %d possible inter-message relationships to compute' % total_combinations)
    data = Parallel(n_jobs=-1)(delayed(get_distance)(df, min(ids), max(ids)) for ids in combinations(indexes, 2))
    distances = pd.DataFrame(data, columns=[id_column_1, id_column_2, distance_column])
    distances.set_index([id_column_1, id_column_2], inplace=True)
    return distances

This gave an immediate improvement from months to days for the expected time, but I suspected that passing the full dataframe was going to be creating significant overhead.

After modifying the function to pass in only the required values, another immediate improvement to less than a day (~20 hours) was attained:

def get_distance(id1, id2, embed1, embed2):
    return [id1, id2, distance.euclidean(embed1, embed2)]

# ...later, in get_distances()...

data = Parallel(n_jobs=-1)(delayed(get_distance)(min(ids), max(ids), df.loc[ids[0], embeddings_column], df.loc[ids[1], embeddings_column]) for ids in combinations(indexes, 2))

Finally, based on joblib's docs and the fact that a significant amount of data is still being transferred to the workers, I swapped to the multiprocessing backend, and saw the expected time drop further to ~1.5 hours. (I also added the tqdm lib so I could get a nicer idea of progress than what joblib provides)

data = Parallel(n_jobs=-1, backend='multiprocessing')(delayed(get_distance)(min(ids), max(ids), df.loc[ids[0], embeddings_column], df.loc[ids[1], embeddings_column]) for ids in tqdm(combinations(indexes, 2), total=total_combinations))

Hopefully this helps someone else with their first foray into Python parallelization!

Chris O'Kelly
  • 1,863
  • 2
  • 18
  • 35