5

I found this question on parallelizing groupby. However, it can't be translated one-to-one into the case where there's multiple arguments - unless I'm mistaken.

Is the following a correct way of doing it? Is there a better way? (Especially getting the index appeared quite inefficient).

def applyParallel(dfGrouped, func, *args):
    with Pool(cpu_count() - 2) as p:
        ret_list = p.starmap(func, zip([group for name, group in dfGrouped], repeat(*args)))

    index = [name for name, group in dfGrouped]
    return pd.Series(index=index, data=ret_list)

which one would call using applyParallel(df.groupby(foo), someFunc, someArgs).

Community
  • 1
  • 1
FooBar
  • 15,724
  • 19
  • 82
  • 171

2 Answers2

4

First caveat, unless your data is fairly large, you may not see much (or any) benefit to parallelization.

Rather than working directly with a multiprocessing pool, the easiest way to do this now would be to try dask - it gives a pandas-like api, mostly managing the parallelism for you.

df = pd.DataFrame(np.random.randn(10000000, 10), columns=list('qwertyuiop'))

df['key'] = np.random.randint(0, 100, size=len(df))

import dask.dataframe as dd

# want a partition size small enough to easily fit into memory
# but large enough to make the overhead worth it
ddf = dd.from_pandas(df, npartitions=4)

%timeit df.groupby('key').sum()
1 loop, best of 3: 1.05 s per loop

# calculated in parallel on the 4 partitions
%timeit ddf.groupby('key').sum().compute()
1 loop, best of 3: 695 ms per loop

Note that by default, dask uses a thread-based scheduler for dataframes, which is faster for functions like sum that release the GIL. If you are applying custom python functions (which will need the GIL), you may see better performance with the multi-processing schedule.

dask.set_options(get=dask.multiprocessing.get)
chrisb
  • 49,833
  • 8
  • 70
  • 70
4

You can use the following version. Python functool provides partial function which will help you do that.

from functools import partial

def applyParallel(dfGrouped, func, *args):
     p=Pool(cpu_count())
     result=p.map(partial(func, *args), [group for name, group in dfGrouped])
     p.close()
Pushkar
  • 41
  • 1