53

I recently found dask module that aims to be an easy-to-use python parallel processing module. Big selling point for me is that it works with pandas.

After reading a bit on its manual page, I can't find a way to do this trivially parallelizable task:

ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply

At the moment, to achieve this in dask, AFAIK,

ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame

which is ugly syntax and is actually slower than outright

df.apply(func, axis = 1) # for pandas DF row apply

Any suggestion?

Edit: Thanks @MRocklin for the map function. It seems to be slower than plain pandas apply. Is this related to pandas GIL releasing issue or am I doing it wrong?

import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)

def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s

s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
gnat
  • 6,213
  • 108
  • 53
  • 73
jf328
  • 6,841
  • 10
  • 58
  • 82
  • 5
    I am not familiar with `dask` module. For mulit-processing, python module `multiprocessing` works well for me when I have to process a big dataframe row-by-row. The idea is also very simple: use `np.array_split` to split big dataframe into 8 pieces and process them simultaneously using `multiprocessing`; Once it's done, use `pd.concat` to concat them back to the original length. For a related post with full code example, see http://stackoverflow.com/questions/30904354/pandas-optimizing-my-code-groupby-apply – Jianxun Li Jul 11 '15 at 21:11
  • 1
    Thanks, very nice. The problem of multiprocessing module is that you need to have a named function (not lambda) and put it out of name=="main" block. That makes research codes badly structured. – jf328 Jul 11 '15 at 23:00
  • If you just want to use a better multiprocessing you could look at [multiprocess](https://github.com/uqfoundation/multiprocess) by @mike-mckerns . You could also try out dask core rather than dask.dataframe and just build dictionaries or use something like https://github.com/ContinuumIO/dask/pull/408 – MRocklin Jul 12 '15 at 20:41

2 Answers2

74

map_partitions

You can apply your function to all of the partitions of your dataframe with the map_partitions function.

df.map_partitions(func, columns=...)

Note that func will be given only part of the dataset at a time, not the entire dataset like with pandas apply (which presumably you wouldn't want if you want to do parallelism.)

map / apply

You can map a function row-wise across a series with map

df.mycolumn.map(func)

You can map a function row-wise across a dataframe with apply

df.apply(func, axis=1)

Threads vs Processes

As of version 0.6.0 dask.dataframes parallelizes with threads. Custom Python functions will not receive much benefit from thread-based parallelism. You could try processes instead

df = dd.read_csv(...)

df.map_partitions(func, columns=...).compute(scheduler='processes')

But avoid apply

However, you should really avoid apply with custom Python functions, both in Pandas and in Dask. This is often a source of poor performance. It could be that if you find a way to do your operation in a vectorized manner then it could be that your Pandas code will be 100x faster and you won't need dask.dataframe at all.

Consider numba

For your particular problem you might consider numba. This significantly improves your performance.

In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)

In [4]: %paste
def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s
## -- End pasted text --

In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms

In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)

In [8]: %time _ = s.apply(fast_func)  # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms

In [9]: %time _ = s.apply(fast_func)  # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms

Disclaimer, I work for the company that makes both numba and dask and employs many of the pandas developers.

MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • Thanks! I tried the map method and it seems to be slower than pandas apply. Could you comment on the edit of original post please? – jf328 Jul 12 '15 at 07:53
  • I already use numba a lot! Thanks for the work. What I normally do is that each row of DataFrame specifies a configuration of a simulation (parameters for a complex/slow func). I do multiprocessing already, just keep an eye on better ways of doing this – jf328 Jul 12 '15 at 09:15
  • @MRocklin Slightly off topic regarding pandas; i try to use map over apply because I've heard it's faster, but I'm not sure why it's faster. Any clarification or links to clarification would be greatly appreciated. – Bob Haffner Jul 12 '15 at 17:05
  • 2
    @BobHaffner no clue. Suggest doing a small experiment and posting a "why is this the case" style stackoverflow question. – MRocklin Jul 12 '15 at 20:39
2

As of v dask.dataframe.apply delegates responsibility to map_partitions:

@insert_meta_param_description(pad=12)
def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds):
    """ Parallel version of pandas.Series.apply
    ...
    """
    if meta is no_default:
        msg = ("`meta` is not specified, inferred from partial data. "
               "Please provide `meta` if the result is unexpected.\n"
               "  Before: .apply(func)\n"
               "  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result\n"
               "  or:     .apply(func, meta=('x', 'f8'))            for series result")
        warnings.warn(msg)

        meta = _emulate(M.apply, self._meta_nonempty, func,
                        convert_dtype=convert_dtype,
                        args=args, **kwds)

    return map_partitions(M.apply, self, func,
                          convert_dtype, args, meta=meta, **kwds)
Shubham Chaudhary
  • 47,722
  • 9
  • 78
  • 80