3

This is related to how to parallelize many (fuzzy) string comparisons using apply in Pandas?

Consider this simple (but funny) example again:

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd

master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'stackoverflow is nice']})

slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'stackoverflow is nice'],'my_value': [1,2,3,4,5]})

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.loc[slave_df.score.idxmax(),'my_value']

master
Out[39]: 
                  original
0  this is a nice sentence
1      this is another one
2    stackoverflow is nice

slave
Out[40]: 
   my_value                      name
0         1               hello world
1         2           congratulations
2         3  this is a nice sentence 
3         4       this is another one
4         5     stackoverflow is nice

What I need to do is simple:

  • For every row in master, I lookup into the Dataframe slave for the best match using the string similarity score computed by fuzzywuzzy.

Now let's make these dataframes a bit bigger:

master = pd.concat([master] * 100,  ignore_index  = True)
slave = pd.concat([slave] * 10,  ignore_index  = True)

First, I have tried with dask

#prepare the computation
dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))

and now here are the timings:

#multithreaded
%timeit dmaster.compute(get=dask.threaded.get) 
1 loop, best of 3: 346 ms per loop

#multiprocess
%timeit dmaster.compute(get=dask.multiprocessing.get) 
1 loop, best of 3: 1.93 s per loop

#good 'ol pandas
%timeit master['my_value'] = master.original.apply(lambda x: helper(x,slave))
100 loops, best of 3: 2.18 ms per loop

Second, I have tried with the good old multiprocessing package

from multiprocessing import Pool, cpu_count

def myfunc(df):
    return df.original.apply(lambda x: helper(x, slave))

from datetime import datetime

if __name__ == '__main__':
     startTime = datetime.now()
     p = Pool(cpu_count() - 1)
     ret_list = p.map(myfunc, [master.iloc[1:100,], master.iloc[100:200 ,],
                               master.iloc[200:300 ,]])
     results = pd.concat(ret_list)
     print datetime.now() - startTime

which gives about the same time

runfile('C:/Users/john/untitled6.py', wdir='C:/Users/john')
0:00:01.927000

Question: why is multiprocessing with both Dask and multiprocessing so slow compared to Pandas here? Assume my real data is much bigger than that. Could I get a better outcome?

After all, the problem I consider here is embarassingly parallel (every row is an independent problem), so these packages should really shine.

Am I missing something here?

Thanks!

ℕʘʘḆḽḘ
  • 18,566
  • 34
  • 128
  • 235
  • 2
    I'd assume it's because you'd need more compute (horizontal scale out) power to justify all the extra overhead that dask has. – cs95 Apr 15 '18 at 00:56
  • I mean, I already have a decent computer with 12GB ram and 4-core i7... – ℕʘʘḆḽḘ Apr 15 '18 at 00:59
  • 1
    Not enough! numpy does it just as well, you'll need more _computers_ :D – cs95 Apr 15 '18 at 01:00
  • no no but here we're talking about multiprocessing. I have 4 cores, so I should expect at least some improvement relative to the 1 core scenario in Pandas... I mean, all the examples with the `multiprocessing` package in Python show an improvement for even simpler examples.. – ℕʘʘḆḽḘ Apr 15 '18 at 01:02
  • 2
    The first call to multiprocessing will need time to spin up the additional processes before even beginning to compute anything. *If* all your data fit comfortably into memory, dask will not necessarily be the right choice. Oh, and you should use the distributed scheduler, even on a single machine. – mdurant Apr 15 '18 at 01:46
  • thanks @mdurant, I have updated my question to make it even broader. please let me know what you think! – ℕʘʘḆḽḘ Apr 15 '18 at 01:48
  • 1
    Next you need to consider the speed of your task (2ms / 4 chunks, i.e., very fast) compared to the time it takes to serialise the data, send it to a process, deserialise it, and return the output. Overheads to managing the execution of the task are in addition to this. – mdurant Apr 15 '18 at 01:51
  • so you think there is a threshold in terms of size where I could get better performance? and why is the `multiprocessing` example even worse here? – ℕʘʘḆḽḘ Apr 15 '18 at 01:53
  • (and I think you are discovering that dask's serialisation of dataframes is much faster than the default pickle one) – mdurant Apr 15 '18 at 01:53
  • Yes, tasks must be big enough to be worth the overhead; but more generally, dask has ways to load the data directly into workers (e.g., the dataframe read_* methods), rather than *passing* the data from the controlling process. – mdurant Apr 15 '18 at 01:55
  • 1
    Does any of this sound like an answer? Some discussion at http://distributed.readthedocs.io/en/latest/efficiency.html – mdurant Apr 15 '18 at 13:46

1 Answers1

6

Let me summarize the comments I made into something like an answer. I hope this information proves useful, as there are a number of issues rolled into one here.

First, I would like to point you to distributed.readthedocs.io/en/latest/efficiency.html , where a number of dask performance topics are discussed. Note that this is all in terms of the distributed scheduler, but since that can be started in-process, with threads or processes, or a combination of these, it really does supercede the previous dask schedulers, and is generally recommended in all cases.

1) It takes time to create processes. This is always true, and particularly true on windows. You will want to create the processes only once, with its fixed overhead, and run many tasks, if you are interested in real-life performance. In dask there are many ways of making your cluster, even locally.

2) Every task that dask (or any other dispatcher) handles incurs some overhead. In the case of the distributed scheduler, this is <1ms, but in the case where the runtime of the task itself is very short, this can be significant.

3) It is an anti-pattern in dask to load the whole dataset in the client and pass it to the worker(s). You want, instead, to use functions like dask.dataframe.read_csv, where the data is loaded by the workers, avoiding expensive serialization and inter-process communication. Dask is really good at moving the computation to where the data is, minimizing communication.

4) When communication between processes, the method of serialization matters, which is my guess at why non-dask multiprocessing is so slow for you.

5) Finally, not all jobs will find gains in performance under dask. This depends on a number of things, but often the main one is: does the data comfortably fit in memory? If yes, it may be hard to match the well-optimized methods in numpy and pandas. As always, you should always profile your code...

mdurant
  • 27,272
  • 5
  • 45
  • 74
  • very nice inded. one thing you may try to make your question even more informative is to find such threshold. that is, can you try to run my code on your machine with a large enough dataframe so that either dask or multiprocessing is faster? that would be fantastic – ℕʘʘḆḽḘ Apr 15 '18 at 18:06
  • I don't have your data :| Also, the "threshold" will depend on a large number of variables, so it's best to abide by the rules-of-thumb described here. – mdurant Apr 15 '18 at 23:20
  • no but I mean with the working example above. you can vary the size of the example dataframe – ℕʘʘḆḽḘ Apr 15 '18 at 23:20
  • 1
    See 3) above, you should NOT create the data in the one process and pass it to the workers, serializing/communication takes longer than processing in this case, and you will never win. – mdurant Apr 15 '18 at 23:29
  • I see, thanks indeed. but this is very specific to `dask`. Would this apply to using `multiprocessing` as well in your opinion? Thanks again ! – ℕʘʘḆḽḘ Apr 16 '18 at 00:00
  • It certainly applies to multiprocessing for exactly the same reason. – mdurant Apr 16 '18 at 00:17