29

I have the following problem

I have a dataframe master that contains sentences, such as

master
Out[8]: 
                  original
0  this is a nice sentence
1      this is another one
2    stackoverflow is nice

For every row in Master, I lookup into another Dataframe slave for the best match using fuzzywuzzy. I use fuzzywuzzy because the matched sentences between the two dataframes could differ a bit (extra characters, etc).

For instance, slave could be

slave
Out[10]: 
   my_value                      name
0         2               hello world
1         1           congratulations
2         2  this is a nice sentence 
3         3       this is another one
4         1     stackoverflow is nice

Here is a fully-functional, wonderful, compact working example :)

from fuzzywuzzy import fuzz
import pandas as pd
import numpy as np
import difflib


master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'stackoverflow is nice']})


slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'stackoverflow is nice'],'my_value': [2,1,2,3,1]})

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    #use fuzzywuzzy to see how close original and name are
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.ix[slave_df.score.idxmax(),'my_value']

master['my_value'] = master.original.apply(lambda x: helper(x,slave))

The 1 million dollars question is: can I parallelize my apply code above?

After all, every row in master is compared to all the rows in slave (slave is a small dataset and I can hold many copies of the data into the RAM).

I dont see why I could not run multiple comparisons (i.e. process multiple rows at the same time).

Problem: I dont know how to do that or if thats even possible.

Any help greatly appreciated!

ℕʘʘḆḽḘ
  • 18,566
  • 34
  • 128
  • 235

3 Answers3

36

You can parallelize this with Dask.dataframe.

>>> dmaster = dd.from_pandas(master, npartitions=4)
>>> dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave), name='my_value'))
>>> dmaster.compute()
                  original  my_value
0  this is a nice sentence         2
1      this is another one         3
2    stackoverflow is nice         1

Additionally, you should think about the tradeoffs between using threads vs processes here. Your fuzzy string matching almost certainly doesn't release the GIL, so you won't get any benefit from using multiple threads. However, using processes will cause data to serialize and move around your machine, which might slow things down a bit.

You can experiment between using threads and processes or a distributed system by managing the get= keyword argument to the compute() method.

import dask.multiprocessing
import dask.threaded

>>> dmaster.compute(get=dask.threaded.get)  # this is default for dask.dataframe
>>> dmaster.compute(get=dask.multiprocessing.get)  # try processes instead
MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • genius! just a quick question: I have a 8 core xeon machine, will that work on it? I cannot use a distributed system as you suggest – ℕʘʘḆḽḘ Jun 22 '16 at 23:34
  • 3
    Multiprocessing will accelerate your computations but will slow down from inter-process data transfer. There's no way for me to know if things will speed up or not without knowing way more about your problem than I really want to get into. I recommend trying it out and profiling. – MRocklin Jun 22 '16 at 23:43
  • thanks @MRocklin ! I am sure many will find this post useful. I, myself, was still completely clueless about `dask` after skimming through http://dask.pydata.org/en/latest/install.html – ℕʘʘḆḽḘ Jun 23 '16 at 00:16
  • follow up if you still have 20 secs. should I play with `npartitions` as well? – ℕʘʘḆḽḘ Jun 23 '16 at 01:00
  • I have a lot of RAM (128GB), so should I use many npartitions? – ℕʘʘḆḽḘ Jun 23 '16 at 02:09
  • sorry for the follow up, but maybe you can give me a hint? I see now a lot of processes running for quite a long time, but the CPU usage remains very low with some spikes at 2% - 5% every few seconds or so. Is this expected? Thank you again for your time – ℕʘʘḆḽḘ Jul 01 '16 at 13:44
4

I'm working on something similar and I wanted to provide a more complete working solution for anyone else you might stumble upon this question. @MRocklin unfortunately has some syntax errors in the code snippets provided. I am no expert with Dask, so I can't comment on some performance considerations, but this should accomplish your task just as @MRocklin has suggested. This is using Dask version 0.17.2 and Pandas version 0.22.0:

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd

master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'stackoverflow is nice']})

slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'stackoverflow is nice'],'my_value': [1,2,3,4,5]})

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.loc[slave_df.score.idxmax(),'my_value']

dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))

Then, obtain your results (like in this interpreter session):

In [6]: dmaster.compute(get=dask.multiprocessing.get)                                             
Out[6]:                                          
                  original  my_value             
0  this is a nice sentence         3             
1      this is another one         4             
2    stackoverflow is nice         5    
shellcat_zero
  • 1,027
  • 13
  • 20
2

These answers are based on an older API. Some newer code:

dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))
dmaster.compute(scheduler='processes') 

Personally I'd ditch that apply call to fuzzy_score in the helper function and just perform the operation there.

You can alter the scheduler using these tips.

Union find
  • 7,759
  • 13
  • 60
  • 111