A short comment to accompany JD Long's answer. I've found that if the number of groups is very large (say hundreds of thousands), and your apply function is doing something fairly simple and quick, then breaking up your dataframe into chunks and assigning each chunk to a worker to carry out a groupby-apply (in serial) can be much faster than doing a parallel groupby-apply and having the workers read off a queue containing a multitude of groups. Example:
import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})
So our dataframe looks like:
a
0 3425
1 1016
2 8141
3 9263
4 8018
Note that column 'a' has many groups (think customer ids):
len(df.a.unique())
15000
A function to operate on our groups:
def f1(group):
time.sleep(0.0001)
return group
Start a pool:
ppe = ProcessPoolExecutor(12)
futures = []
results = []
Do a parallel groupby-apply:
%%time
for name, group in df.groupby('a'):
p = ppe.submit(f1, group)
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
del ppe
CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s
Let's now add a column which partitions the df into many fewer groups:
df['b'] = np.random.randint(0, 12, nrows)
Now instead of 15000 groups there are only 12:
len(df.b.unique())
12
We'll partition our df and do a groupby-apply on each chunk.
ppe = ProcessPoolExecutor(12)
Wrapper fun:
def f2(df):
df.groupby('a').apply(f1)
return df
Send out each chunk to be operated on in serial:
%%time
for i in df.b.unique():
p = ppe.submit(f2, df[df.b==i])
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s
Note that the amount of time spend per group has not changed. Rather what has changed is the length of the queue from which the workers read off of. I suspect that what is happening is that the workers cannot access the shared memory simultaneously, and are returning constantly to read off the queue, and are thus stepping on each others toes. With larger chunks to operate on, the workers return less frequently and so this problem is ameliorated and the overall execution is faster.