3

Building off of this post, I implemented the custom mode formula, but have found issues with performance on this function. Essentially, when I enter into this aggregation, my cluster only uses one of my threads, which is not great for performance. I am doing calculations on over 150 attributes (mostly categorical data) across 16k rows, which I think I can split up into individual threads/processes and throw back together into a single dataframe later on. Note that this aggregation has to be on two columns so I might be getting worse performance for not being able to use a single column as an index.

Is there a way to incorporate dask futures or parallel processing into the aggregate calculation?

import dask.dataframe as dd
from dask.distributed import Client
from pandas import DataFrame

def chunk(s):
    return s.value_counts()

def agg(s):
    s = s._selected_obj
    return s.groupby(level=list(range(s.index.nlevels))).sum()

def finalize(s):
    # s is a multi-index series of the form (group, value): count. First
    # manually group on the group part of the index. The lambda will receive a
    # sub-series with multi index. Next, drop the group part from the index.
    # Finally, determine the index with the maximum value, i.e., the mode.
    level = list(range(s.index.nlevels - 1))
    return (
        s.groupby(level=level)
        .apply(lambda s: s.reset_index(level=level, drop=True).argmax())
    )

def main() -> DataFrame:
    client = Client('scheduler:8786')

    ddf = dd.read_csv('/sample/data.csv')
    custom_mode = dd.Aggregation('custom mode', chunk, agg, finalize)
    result = ddf.groupby(['a','b']).agg(custom_mode).compute()
    return result

Side note, I am using Docker to spin up my scheduler and workers using the daskdev/dask (2.18.1) docker image.

  • Does section 2 of this article: `Using dask ‘delayed’ in a loop` help?: https://pythonhealthcare.org/2018/11/25/99-parallel-processing-functions-and-loops-with-dask-delayed-method/#:~:text=Parallel%20processing%20functions%20and%20loops%20with%20dask%20'delayed'%20method,-Michael%20Allen%20Miscellaneous&text=Dask%20is%20a%20Python%20library,to%20speed%20up%20the%20program. – David Erickson Jun 13 '20 at 00:21
  • 1
    yes thanks David. I split up the aggregation to loop through the columns in the dataframe and then used the delay functionality to parallelize it. – Brendon Gallagher Jun 17 '20 at 22:10
  • 1
    Thanks @BrendonGallagher, would you be able to answer your own question with updated code? I might like to do something similar to this in the future and this would be a good reference point. – David Erickson Jun 18 '20 at 04:19

1 Answers1

1

In the end, I used futures to essentially parallelize the aggregation for each column. Since I had so many columns, passing each aggregation to its own worker thread saved me a bunch of time. Thanks to David for his comments as well as the article on parallel workloads from the dask documentation!

from dask.distributed import Client
from pandas import DataFrame

def chunk(s):
    return s.value_counts()

def agg(s):
    s = s._selected_obj
    return s.groupby(level=list(range(s.index.nlevels))).sum()

def finalize(s):
    level = list(range(s.index.nlevels - 1))
    return (
        s.groupby(level=level)
        .apply(lambda s: s.reset_index(level=level, drop=True).idxmax())
    )

def delayed_mode(ddf, groupby, col, custom_agg):
    return ddf.groupby(groupby).agg({col: custom_agg}).compute()

def main() -> DataFrame:
    client = Client('scheduler:8786')

    ddf = dd.read_csv('/sample/data.csv')
    custom_mode = dd.Aggregation('custom mode', chunk, agg, finalize)

    futures = []

    for col in multiple_trimmed.columns:
        future = client.submit(delayed_mode, ddf, ["a", "b"], col, custom_mode_dask)
        futures.append(future)

    ddfs = client.gather(futures)
    result = pd.concat(ddfs, axis=1)
    return result