0

I am trying to calculate the moving average of a very large data set. The number of rows is approx 30M. To illustrate using pandas as follows

df = pd.DataFrame({'cust_id':['a', 'a', 'a', 'b', 'b'], 'sales': [100, 200, 300, 400, 500]})
df['mov_avg'] = df.groupby("cust_id")["sales"].apply(lambda x: x.ewm(alpha=0.5, adjust=False).mean())

Here I am using pandas to calculate the moving average. Using above it takes around 20 minutes to calculate on the 30M dataset. Is there a way to leverage DASK here?

sushmit
  • 4,369
  • 2
  • 35
  • 38

1 Answers1

5

You can use Dask.delayed for your calculation. In the example below, a standard python function which contains the pandas moving average command is turned into a dask function using a @delayed decorator.

import pandas as pd
from dask import delayed

@delayed
def mov_average(x):
    x['mov_avg'] = x.groupby("cust_id")["sales"].apply(
                            lambda x: x.ewm(alpha=0.5, adjust=False).mean())
    return x

df = pd.DataFrame({'cust_id':['a', 'a', 'a', 'b', 'b'],
                   'sales': [100, 200, 300, 400, 500]})

df['mov_avg'] = df.groupby("cust_id")["sales"].apply(
                            lambda x: x.ewm(alpha=0.5, adjust=False).mean())

df_1 = mov_average(df).compute()

Output

df
Out[22]: 
  cust_id  sales  mov_avg
0       a    100    100.0
1       a    200    150.0
2       a    300    225.0
3       b    400    400.0
4       b    500    450.0

df_1
Out[23]: 
  cust_id  sales  mov_avg
0       a    100    100.0
1       a    200    150.0
2       a    300    225.0
3       b    400    400.0
4       b    500    450.0

Alternatively, you could try converting (or reading your file) into a dask data frame. The visualization of the scheduler tasks shows the parallelization of the calculations. So, if your data frame is large enough you might get a reduction in your computation time. You could also try optimizing the number of data frame partitions.

from dask import dataframe

ddf = dataframe.from_pandas(df, npartitions=3)
ddf['emv'] = ddf.groupby('cust_id')['sales'].apply(lambda x: x.ewm(alpha=0.5, adjust=False).mean()).compute().sort_index()
ddf.visualize()

enter image description here

ddf.compute()

        cust_id     sales   emv
    0   a   100     100.0
    1   a   200     150.0
    2   a   300     225.0
    3   b   400     400.0
    4   b   500     450.0
KRKirov
  • 3,854
  • 2
  • 16
  • 20
  • There is no performance gain using mov_average(df).compute() or mov_average(dask_df).compute(scheduler = 'processes') . Any idea what can improve the performance ? – sushmit Jan 17 '20 at 14:25
  • You need to run processes in parallel, the calculations for you customer 'a' and 'b' are independent – ps0604 Mar 13 '21 at 19:18