2

A custom dask GroupBy Aggregation is very handy, but I am having trouble to define one working for the most often value in a column.

What do I have:

So from the example here, we can define custom aggregate functions like this:

custom_sum = dd.Aggregation('custom_sum', lambda s: s.sum(), lambda s0: s0.sum())
my_aggregate = {
    'A': custom_sum,
    'B': custom_most_often_value, ### <<< This is the goal.
    'C': ['max','min','mean'],
    'D': ['max','min','mean']
}
col_name = 'Z'
ddf_agg = ddf.groupby(col_name).agg(my_aggregate).compute()

While this works for custom_sum (as on the example page), the adaption to most often value could be like this (from the example here):

custom_most_often_value = dd.Aggregation('custom_most_often_value', lambda x:x.value_counts().index[0], lambda x0:x0.value_counts().index[0])

but it yields

ValueError: Metadata inference failed in `_agg_finalize`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

Then I tried to find the meta keyword in the dd.Aggregation implementation to define it, but could not find it.. And the fact that it is not needed in the example of custom_sum makes me think that the error is somewhere else..

So my question would be, how to get that mostly occuring value of a column in a df.groupby(..).agg(..). Thanks!

gies0r
  • 4,723
  • 4
  • 39
  • 50
  • The most often value (or rather, most common), is called a `mode`. Try to simply write .mode() instead of sum! Aside from that, consider not using lambda functions. Not entirely certain how that is implemented in dask, but in "normal" pandas simply writing "sum" or "mode" does the trick. – Dustin Aug 10 '20 at 17:28
  • No @Dustin, ´mode` yields `AttributeError("'SeriesGroupBy' object has no attribute 'mode'")` when calling it via `custom_mode = dd.Aggregation('custom_mode', lambda s: s.mode(), lambda s0: s0.mode())` or via `my_aggregate={'A': 'mode'}` – gies0r Aug 10 '20 at 19:33

2 Answers2

2

A quick clarification rather than an answer: the meta parameter is used in the .agg() method, to specify the column data types you expect, best expressed as a zero-length pandas dataframe. Dask will supply dummy data to your function otherwise, to try to guess those types, but this doesn't always work.

mdurant
  • 27,272
  • 5
  • 45
  • 74
2

The issue that you're running into, is that the separate stages of the aggregation can't be the same function applied recursively, as in the custom_sum example that you're looking at.

I've modified code from this answer, leaving comments from @ user8570642, because they are very helpful. Note that this method will solve for a list of groupby keys: https://stackoverflow.com/a/46082075/3968619

def chunk(s):
    # for the comments, assume only a single grouping column, the 
    # implementation can handle multiple group columns.
    #
    # s is a grouped series. value_counts creates a multi-series like 
    # (group, value): count
    return s.value_counts()


def agg(s):
#     print('agg',s.apply(lambda s: s.groupby(level=-1).sum()))
    # s is a grouped multi-index series. In .apply the full sub-df will passed
    # multi-index and all. Group on the value level and sum the counts. The
    # result of the lambda function is a series. Therefore, the result of the 
    # apply is a multi-index series like (group, value): count
    return s.apply(lambda s: s.groupby(level=-1).sum())

    # faster version using pandas internals
    s = s._selected_obj
    return s.groupby(level=list(range(s.index.nlevels))).sum()


def finalize(s):
    # s is a multi-index series of the form (group, value): count. First
    # manually group on the group part of the index. The lambda will receive a
    # sub-series with multi index. Next, drop the group part from the index.
    # Finally, determine the index with the maximum value, i.e., the mode.
    level = list(range(s.index.nlevels - 1))
    return (
        s.groupby(level=level)
        .apply(lambda s: s.reset_index(level=level, drop=True).idxmax())
    )

max_occurence = dd.Aggregation('mode', chunk, agg, finalize)

chunk will count the values for the groupby object in each partition. agg will take the results from chunk and groupy the original groupby command and sum the value counts, so that we have the value counts for every group. finalize will take the multi-index series provided by agg and return the most frequently occurring value of B for each group from Z.

Here's a test case:

df = dd.from_pandas(
    pd.DataFrame({"A":[1,1,1,1,2,2,3]*10,"B":[5,5,5,5,1,1,1]*10,
                  'Z':['mike','amy','amy','amy','chris','chris','sandra']*10}), npartitions=10)
res = df.groupby(['Z']).agg({'B': mode}).compute()
print(res)
EMiller
  • 817
  • 1
  • 7
  • 20