I need to merge two large datasets based on string columns which don't perfectly match. I have wide datasets which can help me determine the best match more accurately than string distance alone, but I first need to return several 'top matches' for each string.
Reproducible example:
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, limit = n)
df2_index = [i[2] for i in tup]
scores = [i[1] for i in tup]
return pd.DataFrame({
"df1_index": [idx] * n,
"df2_index": df2_index,
"score": scores
})
import pandas as pd
from fuzzywuzzy import process
s1 = pd.Series(["two apples", "one orange", "my banana", "red grape", "huge kiwi"])
s2 = pd.Series(["a couple of apples", "old orange", "your bananas", "purple grape", "tropical fruit"])
pd.concat([example_function(index, value, s2, 2) for index, value in s1.items()]).reset_index()
I've been unsuccessful at parallelizing this function. What seems closest to what I'm trying to do is the multiprocessing implementation, but even with starmap I am not getting results. I'd imagine there's a simple way to achieve this but have not yet found a method that works.
I'm open to any advice on how to optimize my code, but parallel processing would be an appropriate solution in this case since it looks like it will take about 4-5 hours (in hindsight this was a generous estimate) if done sequentially.
UPDATE
Thank you for the solutions. I have a df1 which is 7,000 rows and a df2 which is 70,000 rows. For the results below I've searched all 70,000 rows from df2 for each of the first 20 rows from df1.
- dataframe concat method (original method): 96 sec
- dictionary chain method: 90 sec
- add parallel with dask (4 workers): 77 sec
- use rapidfuzz instead of fuzzywuzzy: 6.73 sec
- use rapidfuzz with dask (4 workers): 5.29 sec
Here is the optimized code:
from dask.distributed import Client
from dask import delayed
from rapidfuzz import process, fuzz
from itertools import chain
client = Client(n_workers = 4, processes = False)
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, scorer = fuzz.WRatio, limit = n)
return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]
jobs = [delayed(example_function)(index, value, t3, 20) for index, value in t1.items()]
data = delayed(jobs)
df = pd.DataFrame.from_records(chain(*data.compute()))
print(df)
client.close()
Parallel processing didn't have quite the impact that I was expecting. Perhaps it is not setup ideally for this function, or perhaps it will continue to scale to a larger impact as I include more iterations. Either way it did make a difference so I'm using it in my personal solution. Thanks all