Building off of this post, I implemented the custom mode formula, but have found issues with performance on this function. Essentially, when I enter into this aggregation, my cluster only uses one of my threads, which is not great for performance. I am doing calculations on over 150 attributes (mostly categorical data) across 16k rows, which I think I can split up into individual threads/processes and throw back together into a single dataframe later on. Note that this aggregation has to be on two columns so I might be getting worse performance for not being able to use a single column as an index.
Is there a way to incorporate dask futures or parallel processing into the aggregate calculation?
import dask.dataframe as dd
from dask.distributed import Client
from pandas import DataFrame
def chunk(s):
return s.value_counts()
def agg(s):
s = s._selected_obj
return s.groupby(level=list(range(s.index.nlevels))).sum()
def finalize(s):
# s is a multi-index series of the form (group, value): count. First
# manually group on the group part of the index. The lambda will receive a
# sub-series with multi index. Next, drop the group part from the index.
# Finally, determine the index with the maximum value, i.e., the mode.
level = list(range(s.index.nlevels - 1))
return (
s.groupby(level=level)
.apply(lambda s: s.reset_index(level=level, drop=True).argmax())
)
def main() -> DataFrame:
client = Client('scheduler:8786')
ddf = dd.read_csv('/sample/data.csv')
custom_mode = dd.Aggregation('custom mode', chunk, agg, finalize)
result = ddf.groupby(['a','b']).agg(custom_mode).compute()
return result
Side note, I am using Docker to spin up my scheduler and workers using the daskdev/dask (2.18.1) docker image.