1

Similar to the pandas GroupBy to List post, we are trying to run this process in dask.

Our current solution implements the dataframe.apply function. Since this is a bottle neck in our process - are there any other options?
Bellow is a sample code using the dask.datasets.timeseries data.

import dask
import dask.dataframe as dd
import pandas as pd

def set_list_att2(x: dd.Series):
        return list(set([item for item in x.values]))

df = dask.datasets.timeseries()
df_gb = df.groupby(df.name)
gp_col = ['x','y' ,'id']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att2, 
                                           meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
                   for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.compute().to_frame(), how='left')        
df_edge_att.head()

dataframe result

Note in the line

df_edge_att = df_edge_att.join(ser.compute().to_frame(), how='left')  

we added the compute other wise the sample code returned only 1 row in the final dataframe.

skibee
  • 1,279
  • 1
  • 17
  • 37
  • I am also facing similar issue. I tried to create a vectorize function, but don't know where to specify the datatype of new column. – Lijo Jose Apr 18 '19 at 09:27
  • [This post](https://stackoverflow.com/questions/46375382/aggregate-a-dask-dataframe-and-produce-a-dataframe-of-aggregates/49252330#49252330) may also help – skibee Apr 23 '19 at 11:29

1 Answers1

0

I ran some test and definitely try to use dd.Aggregation and not apply. see below results:

%%timeit
df = dask.datasets.timeseries()
df_gb = df.groupby(df.name)
gp_col = ['x','y' ,'id']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att2, 
                                           meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
                   for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')        
df_edge_att.head()

The results are:
5min 44s ± 11.2 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

However running with dd.Aggregation there is a considerable improvement:

%%timeit
df = dask.datasets.timeseries()
custom_agg = dd.Aggregation(
    'custom_agg', 
    lambda s: s.apply(set), 
    lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),
)
df_gb = df.groupby(df.name)
gp_col = ['x','y' ,'id']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')        
df_edge_att.head()

The results are:
2min ± 1.13 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

Update
This method is now also added into dask's the documentation

skibee
  • 1,279
  • 1
  • 17
  • 37