1

I need to merge two large datasets based on string columns which don't perfectly match. I have wide datasets which can help me determine the best match more accurately than string distance alone, but I first need to return several 'top matches' for each string.

Reproducible example:

def example_function(idx, string, comparisons, n):
    tup = process.extract(string, comparisons, limit = n)
    df2_index = [i[2] for i in tup]
    scores = [i[1] for i in tup]
    return pd.DataFrame({
        "df1_index": [idx] * n,
        "df2_index": df2_index,
        "score": scores
    })

import pandas as pd
from fuzzywuzzy import process

s1 = pd.Series(["two apples", "one orange", "my banana", "red grape", "huge kiwi"])
s2 = pd.Series(["a couple of apples", "old orange", "your bananas", "purple grape", "tropical fruit"])

pd.concat([example_function(index, value, s2, 2) for index, value in s1.items()]).reset_index()

I've been unsuccessful at parallelizing this function. What seems closest to what I'm trying to do is the multiprocessing implementation, but even with starmap I am not getting results. I'd imagine there's a simple way to achieve this but have not yet found a method that works.

I'm open to any advice on how to optimize my code, but parallel processing would be an appropriate solution in this case since it looks like it will take about 4-5 hours (in hindsight this was a generous estimate) if done sequentially.

UPDATE

Thank you for the solutions. I have a df1 which is 7,000 rows and a df2 which is 70,000 rows. For the results below I've searched all 70,000 rows from df2 for each of the first 20 rows from df1.

  • dataframe concat method (original method): 96 sec
  • dictionary chain method: 90 sec
  • add parallel with dask (4 workers): 77 sec
  • use rapidfuzz instead of fuzzywuzzy: 6.73 sec
  • use rapidfuzz with dask (4 workers): 5.29 sec

Here is the optimized code:

from dask.distributed import Client
from dask import delayed
from rapidfuzz import process, fuzz
from itertools import chain

client = Client(n_workers = 4, processes = False)

def example_function(idx, string, comparisons, n):
    tup = process.extract(string, comparisons, scorer = fuzz.WRatio, limit = n)
    return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]

jobs = [delayed(example_function)(index, value, t3, 20) for index, value in t1.items()]
data = delayed(jobs)
df = pd.DataFrame.from_records(chain(*data.compute()))
print(df)

client.close()

Parallel processing didn't have quite the impact that I was expecting. Perhaps it is not setup ideally for this function, or perhaps it will continue to scale to a larger impact as I include more iterations. Either way it did make a difference so I'm using it in my personal solution. Thanks all

Chad S
  • 53
  • 6
  • 1
    Before parallelizing it would be good to try to make it more efficient. Have you done a [speed profile](https://docs.python.org/3/library/profile.html) to see where the CPU time is being spent? I suspect a lot of it might be in instantiating all the mini-dataframes inside the function. DataFrame creation overheads can be quite significant. – Bill Oct 21 '21 at 21:37
  • 1
    As a first step you can replace fuzzywuzzy with [RapidFuzz](https://github.com/maxbachmann/RapidFuzz) which provides a much faster implementation. @Bill I would assume most of the time to be spent in process.extract. The time to create the dataframes should be pretty minor in relation. – maxbachmann Oct 22 '21 at 14:31
  • Holy cow Max that is fast. Really appreciate the comment... wish I stumbled across your github page earlier. – Chad S Oct 22 '21 at 15:13
  • @ChadS would you mind reporting the speed improvements you got with all the various suggestions? This would be interesting to know. – Bill Oct 22 '21 at 16:18
  • 1
    Happy to report it, but I'm still trying to get dask to work. Currently throwing me into an endless loop of errors. – Chad S Oct 22 '21 at 16:34
  • Very interesting thanks. I am still learning Dask so not sure if we implemented it the best way. Maybe creating a delayed job for each `example_function` call is not the best, especially if it is now very fast. Try batching 100 function calls for each delayed task maybe? Perhaps others with more experience could advise. – Bill Oct 22 '21 at 18:26
  • Also, I noticed the `dask.delayed` decorator is only supposed to be applied to the function (not the function output!). See my modified answer below. Would like to know if this changes the speed up achieved. Thanks. – Bill Oct 22 '21 at 18:35
  • updated- slight time improvement after fixing the parentheses. – Chad S Oct 22 '21 at 19:51

1 Answers1

1

This doesn't answer your question but I'd be curious to know if it speeds things up. Just returning dictionaries instead of DataFrames should be much more efficient:

from itertools import chain

def example_function(idx, string, comparisons, n):
    tup = process.extract(string, comparisons, limit = n)
    return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]

data = chain(*(example_function(index, value, s2, 2) for index, value in s1.items()))
df = pd.DataFrame.from_records(data)
print(df)

Output:

   idx  index2  score
0    0       0     86
1    0       3     43
2    1       1     80
3    1       0     45
4    2       2     76
5    2       1     32
6    3       3     76
7    3       1     53
8    4       0     30
9    4       3     29

UPDATE: Here is one answer to your question:

Dask provides a very convenient way to parallelize execution with Python:

from dask.distributed import Client
from dask import delayed
from itertools import chain

def example_function(idx, string, comparisons, n):
    tup = process.extract(string, comparisons, limit = n)
    return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]

client = Client(n_workers=4)  # Choose number of cores
jobs = [delayed(example_function)(index, value, s2, 2) for index, value in s1.items()]
data = delayed(jobs)
df = pd.DataFrame.from_records(chain(*data.compute()))
client.close()

Try this on a big batch of data and see if it speeds up execution (I tried it on this small sample and it was no faster).

Bill
  • 10,323
  • 10
  • 62
  • 85
  • Thanks a ton for your detailed response, Bill- I'll test it out tomorrow. I've only been using python for a few weeks and am not yet familiar with all the data types. I appreciate the dictionary solution, I'll let you know how it goes. – Chad S Oct 21 '21 at 23:17
  • This is pretty advanced stuff so don't feel bad if you're struggling. Maybe focus on learning efficient Python code before getting into more advanced data processing tasks with Pandas and Dask etc. – Bill Oct 21 '21 at 23:31