6

So dask has now been updated to support custom aggregation functions for groupby. (Thanks to the dev team and @chmp for working on this!). I am currently trying to construct a mode function and corresponding count function. Basically what I envision is that mode returns a list, for each grouping, of the most common values for a specific column (ie. [4, 1, 2]). Additionally, there is a corresponding count function that returns the number of instances of those values, ie. 3.

Now I am currently trying to implement this in code. As per the groupby.py file, the parameters for custom aggregations are as follows:

Parameters
    ----------
    name : str
        the name of the aggregation. It should be unique, since intermediate
        result will be identified by this name.
    chunk : callable
        a function that will be called with the grouped column of each
        partition. It can either return a single series or a tuple of series.
        The index has to be equal to the groups.
    agg : callable
        a function that will be called to aggregate the results of each chunk.
        Again the argument(s) will be grouped series. If ``chunk`` returned a
        tuple, ``agg`` will be called with all of them as individual positional
        arguments.
    finalize : callable
        an optional finalizer that will be called with the results from the
        aggregation.

Here is the provided code for mean:

    custom_mean = dd.Aggregation(
        'custom_mean',
        lambda s: (s.count(), s.sum()),
        lambda count, sum: (count.sum(), sum.sum()),
        lambda count, sum: sum / count,
    )
    df.groupby('g').agg(custom_mean)

I am trying to think of the best way to do this. Currently I have the following functions:

def custom_count(x):
    count = Counter(x)
    freq_list = count.values()
    max_cnt = max(freq_list)
    total = freq_list.count(max_cnt)
    return count.most_common(total)

custom_mode = dd.Aggregation(
    'custom_mode',
    lambda s: custom_count(s),
    lambda s1: s1.extend(),
    lambda s2: ......
)

However I am getting stuck on understanding how exactly the agg part should be working. Any help on this problem would be appreciated.

Thanks!

user48944
  • 311
  • 1
  • 14

1 Answers1

3

Admittedly, the docs are currently somewhat light on detail. Thanks for bringing this issue to my attention. Please let me now if this answer helps and I will contribute an updated version of the docs to dask.

To your question: for a single return value, the different steps of the aggregation are equivalent to:

res = chunk(df.groupby('g')['col'])
res = agg(res.groupby(level=[0]))
res = finalize(res)

In these terms, the mode function could be implemented as follows:

def chunk(s):
    # for the comments, assume only a single grouping column, the 
    # implementation can handle multiple group columns.
    #
    # s is a grouped series. value_counts creates a multi-series like 
    # (group, value): count
    return s.value_counts()


def agg(s):
    # s is a grouped multi-index series. In .apply the full sub-df will passed
    # multi-index and all. Group on the value level and sum the counts. The
    # result of the lambda function is a series. Therefore, the result of the 
    # apply is a multi-index series like (group, value): count
    return s.apply(lambda s: s.groupby(level=-1).sum())

    # faster version using pandas internals
    s = s._selected_obj
    return s.groupby(level=list(range(s.index.nlevels))).sum()


def finalize(s):
    # s is a multi-index series of the form (group, value): count. First
    # manually group on the group part of the index. The lambda will receive a
    # sub-series with multi index. Next, drop the group part from the index.
    # Finally, determine the index with the maximum value, i.e., the mode.
    level = list(range(s.index.nlevels - 1))
    return (
        s.groupby(level=level)
        .apply(lambda s: s.reset_index(level=level, drop=True).argmax())
    )

mode = dd.Aggregation('mode', chunk, agg, finalize)

Note, that this implementation does not match the dataframe .mode function in case of ties. This version will return one of the values in case of a tie, instead of all values.

The mode aggregation can now be used as in

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({
    'col': [0, 1, 1, 2, 3] * 10,
    'g0': [0, 0, 0, 1, 1] * 10,
    'g1': [0, 0, 0, 1, 1] * 10,
})
ddf = dd.from_pandas(df, npartitions=10)

res = ddf.groupby(['g0', 'g1']).agg({'col': mode}).compute()
print(res)
  • I'm getting a raise Exception("cannot handle a non-unique multi-index!") – user48944 Sep 06 '17 at 19:43
  • At which step? Can you post a minimal example (e.g., as a gist)? Did you try running the code manually as in the first code block? –  Sep 06 '17 at 19:53
  • It's failing in ValueError: Metadata inference failed in `_agg_finalize`. Traceback is: Traceback (most recent call last): File "C:\ProgramData\Anaconda2\lib\site-packages\dask-0.15.2+14.g8d906032-py2.7.egg\dask\dataframe\groupby.py", line 1130, in agg return self.aggregate(arg, split_every=split_every, split_out=split_out) ...... ValueError: Metadata inference failed in `_agg_finalize`. I'll try to get a MCV example. – user48944 Sep 06 '17 at 20:01
  • Added MCV example in edits. Not the same error but ya. – user48944 Sep 06 '17 at 20:18
  • Hm. Ok. Didn't think of that. The issue seems to be that intermediates of both aggregate functions are passed around in a single dataframe. However, mode adds the value to the index, whereas count is sticking to the group columns. If use use mode for a single column it should work as expected. This implementation does not play nicely with other aggregation functions or other columns. I'm a bit swamped the next days, but will take a look at this issues afterwards. You could use store the intermediate counts in object columns. This idea however does interact badly with dask's metadata inference. –  Sep 06 '17 at 20:33
  • Ok no worries. Thanks for looking at this, I'll try to work on an implementation myself, but I'm pretty novice at this stuff. Thank You! – user48944 Sep 06 '17 at 20:39
  • Any chance this can be revisited? – user48944 Sep 18 '17 at 17:44
  • Sorry, somewhat busy atm. You can always bypass pandas and use pure python objects, by returning lists as intermediates. This way you prevent pandas from interpreting the result. However, the code will be slow-ish and has some rough edges. I created an [issue](https://github.com/dask/dask/issues/2708) for the latter that contains an alternative implementation. Hope that helps. –  Sep 24 '17 at 07:02
  • here's the pandas version for reference `df.groupby(['g0', 'g1']).agg({'col': lambda x: pd.Series.mode(x)[0]})` – Ray Bell Jun 17 '20 at 03:55