1

I have two dictionaries dict1 and dict2, with the key value pairs being dictionary and dataframes.

dict1 = {"A" : df1,"B" : df2,"C" : df3}
dict2 = {"A" : df4,"B" : df5,"C" : df6}

I want compare every row of df1['Last_Name'] with df4['Last_Name] and create a new field df1['Match'] with the one with highest Levenstien distance. Similarly df2 with df5 and df3 with df6.

Now I want these 3 comparisons in parallel, I tried multiprocessing and concurrent.futures. But somehow it is not working.

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for key1, df1 in dict1.items():
        for key2, df2 in dict2.items():
            futures.append(executor.submit(add_flag, df1, df2))

    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
        dict1[key1] = result
Pep_8_Guardiola
  • 5,002
  • 1
  • 24
  • 35
Vinayak
  • 45
  • 4
  • Where is `add_flag()` defined? – artemis May 25 '23 at 15:07
  • 1
    How are you calculating the levenshtein distance? With `rapidfuzz` you can `.cdist(df1['Last_Name'], df4['Last Name'], workers=-1)` which will score them all at once and use all CPU cores: https://stackoverflow.com/a/70437968 – jqurious May 25 '23 at 17:38

2 Answers2

1

So, a few things here:

  1. concurrent.futures.ThreadPoolExecutor is for I/O; you want ProcessPoolExecutor to do things that are bound to your CPU.
  2. Your analysis, which is asking to be done in parallel, can be done using zip, not a nested loop.

And a note, add_flag() needs to return a dataframe with a column name that matches some key in dict1.

with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = []
    for key1, df1 in dict1.items():
        df2 = dict2[key1]
        futures.append(executor.submit(add_flag, df1, df2))

    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
        dict1[key1] = result
artemis
  • 6,857
  • 11
  • 46
  • 99
  • Yes add flag returns a dataframe df1 with the new field "Match". – Vinayak May 25 '23 at 15:14
  • Got it, thanks for the clarification. I hope my solution worked for you @Vinayak – artemis May 25 '23 at 15:20
  • Have one query, if I use ProcessPoolExecutor, the code keeps running and doesnt stop, whereas with ThreadPoolExecutor the execution gets completed. Not sure if I'm missing anything in code. – Vinayak May 26 '23 at 02:44
1

Basically what you need to do is to define a function that receives 2 data frames as parameters and compare it, and call this function using future, multithread, or whatever you prefer, like this: Compare Dfs function ( I am using fuzzywuzzy to calculate the Levenstien distance, but you can do it in your way:

def add_flag(df1, df2):
    for index, row in df1.iterrows():
        best_match = None
        best_distance = 0
        for _, row2 in df2.iterrows():
            distance = fuzz.ratio(row['Last_Name'], row2['Last_Name'])
            if distance > best_distance:
                best_match = row2
                best_distance = distance

        if best_match is not None:
            df1.loc[index, 'Match'] = best_match['Last_Name']

    return df1

Then you call the functions for example you can import concurrent.futures and run the following:

dict1 = {"A": df1, "B": df2, "C": df3}
dict2 = {"A": df4, "B": df5, "C": df6}

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for key1, df1 in dict1.items():
        df2 = dict2[key1]
        futures.append(executor.submit(add_flag, df1, df2))

    for future, (key1, df1) in zip(concurrent.futures.as_completed(futures), dict1.items()):
        result = future.result()
        dict1[key1] = result

It will iterate the dicts and call the function in an async way.

  • This will not work for parallel processing as op wants; see https://stackoverflow.com/questions/51828790/what-is-the-difference-between-processpoolexecutor-and-threadpoolexecutor – artemis May 25 '23 at 15:20
  • I got it, I didn't have idea future was more oriented to I/O. Thanks! – Anderson Dutra May 25 '23 at 18:07