So dask has now been updated to support custom aggregation functions for groupby. (Thanks to the dev team and @chmp for working on this!). I am currently trying to construct a mode function and corresponding count function. Basically what I envision is that mode returns a list, for each grouping, of the most common values for a specific column (ie. [4, 1, 2]). Additionally, there is a corresponding count function that returns the number of instances of those values, ie. 3.
Now I am currently trying to implement this in code. As per the groupby.py file, the parameters for custom aggregations are as follows:
Parameters
----------
name : str
the name of the aggregation. It should be unique, since intermediate
result will be identified by this name.
chunk : callable
a function that will be called with the grouped column of each
partition. It can either return a single series or a tuple of series.
The index has to be equal to the groups.
agg : callable
a function that will be called to aggregate the results of each chunk.
Again the argument(s) will be grouped series. If ``chunk`` returned a
tuple, ``agg`` will be called with all of them as individual positional
arguments.
finalize : callable
an optional finalizer that will be called with the results from the
aggregation.
Here is the provided code for mean:
custom_mean = dd.Aggregation(
'custom_mean',
lambda s: (s.count(), s.sum()),
lambda count, sum: (count.sum(), sum.sum()),
lambda count, sum: sum / count,
)
df.groupby('g').agg(custom_mean)
I am trying to think of the best way to do this. Currently I have the following functions:
def custom_count(x):
count = Counter(x)
freq_list = count.values()
max_cnt = max(freq_list)
total = freq_list.count(max_cnt)
return count.most_common(total)
custom_mode = dd.Aggregation(
'custom_mode',
lambda s: custom_count(s),
lambda s1: s1.extend(),
lambda s2: ......
)
However I am getting stuck on understanding how exactly the agg part should be working. Any help on this problem would be appreciated.
Thanks!