To implement nunique in dask groupby you have to use an aggregate function.
import pandas as pd
import dask.dataframe as dd
def chunk(s):
'''
The function applied to the
individual partition (map)
'''
return s.apply(lambda x: list(set(x)))
def agg(s):
'''
The function whic will aggrgate
the result from all the partitions(reduce)
'''
s = s._selected_obj
return s.groupby(level=list(range(s.index.nlevels))).sum()
def finalize(s):
'''
The optional functional that will be
applied to the result of the agg_tu functions
'''
return s.apply(lambda x: len(set(x)))
tunique = dd.Aggregation('tunique', chunk, agg,finalize)
df = pd.DataFrame({
'col': [0, 0, 1, 1, 2, 3, 3] * 10,
'g0': ['a', 'a', 'b', 'a', 'b', 'b', 'a'] * 10,
})
ddf = dd.from_pandas(df, npartitions=10)
res = ddf.groupby(['col']).agg({'g0': tunique}).compute()
print(res)