This is related to how to parallelize many (fuzzy) string comparisons using apply in Pandas?
Consider this simple (but funny) example again:
import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd
master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'stackoverflow is nice']})
slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'stackoverflow is nice'],'my_value': [1,2,3,4,5]})
def fuzzy_score(str1, str2):
return fuzz.token_set_ratio(str1, str2)
def helper(orig_string, slave_df):
slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
#return my_value corresponding to the highest score
return slave_df.loc[slave_df.score.idxmax(),'my_value']
master
Out[39]:
original
0 this is a nice sentence
1 this is another one
2 stackoverflow is nice
slave
Out[40]:
my_value name
0 1 hello world
1 2 congratulations
2 3 this is a nice sentence
3 4 this is another one
4 5 stackoverflow is nice
What I need to do is simple:
- For every row in
master
, I lookup into the Dataframeslave
for the best match using the string similarity score computed byfuzzywuzzy
.
Now let's make these dataframes a bit bigger:
master = pd.concat([master] * 100, ignore_index = True)
slave = pd.concat([slave] * 10, ignore_index = True)
First, I have tried with dask
#prepare the computation
dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))
and now here are the timings:
#multithreaded
%timeit dmaster.compute(get=dask.threaded.get)
1 loop, best of 3: 346 ms per loop
#multiprocess
%timeit dmaster.compute(get=dask.multiprocessing.get)
1 loop, best of 3: 1.93 s per loop
#good 'ol pandas
%timeit master['my_value'] = master.original.apply(lambda x: helper(x,slave))
100 loops, best of 3: 2.18 ms per loop
Second, I have tried with the good old multiprocessing
package
from multiprocessing import Pool, cpu_count
def myfunc(df):
return df.original.apply(lambda x: helper(x, slave))
from datetime import datetime
if __name__ == '__main__':
startTime = datetime.now()
p = Pool(cpu_count() - 1)
ret_list = p.map(myfunc, [master.iloc[1:100,], master.iloc[100:200 ,],
master.iloc[200:300 ,]])
results = pd.concat(ret_list)
print datetime.now() - startTime
which gives about the same time
runfile('C:/Users/john/untitled6.py', wdir='C:/Users/john')
0:00:01.927000
Question: why is multiprocessing with both Dask
and multiprocessing
so slow compared to Pandas here? Assume my real data is much bigger than that. Could I get a better outcome?
After all, the problem I consider here is embarassingly parallel
(every row is an independent problem), so these packages should really shine.
Am I missing something here?
Thanks!