I'm trying to replicate the below pandas group by rolling mean logic in dask. But stuck at 1) how to specify time period in days and 2) how to assign it back into the original frame?
df['avg3d']=df.groupby('g')['v'].transform(lambda x: x.rolling('3D').mean())
Get errors like:
ValueError: index must be monotonic
, ValueError: Not all divisions are known, can't align partitions
or ValueError: cannot reindex from a duplicate axis
Full example
import pandas as pd
import dask.dataframe
df1 = pd.DataFrame({'g':['a']*10,'v':range(10)},index=pd.date_range('2020-01-01',periods=10))
df2=df1.copy()
df2['g']='b'
df = pd.concat([df1,df2]).sort_index()
df['avg3d']=df.groupby('g')['v'].transform(lambda x: x.rolling('3D').mean())
ddf = dask.dataframe.from_pandas(df, npartitions=4)
# works
ddf.groupby('g')['v'].apply(lambda x: x.rolling(3).mean(), meta=('avg3d', 'f8')).compute()
# rolling time period fails
ddf.groupby('g')['v'].apply(lambda x: x.rolling('3D').mean(), meta=('avg3d', 'f8')).compute()
# how do I add it to the rest of the data??
# neither of these work
ddf['avg3d']=ddf.groupby('g')['v'].apply(lambda x: x.rolling('3D').mean(), meta=('x', 'f8'))
ddf['avg3d']=ddf.groupby('g')['v'].transform(lambda x: x.rolling(3).mean(), meta=('x', 'f8'))
ddft = ddf.merge(ddf3d)
ddf.assign(avg3d=ddf.groupby('g')['v'].transform(lambda x: x.rolling(3).mean(), meta=('x', 'f8')))
Looked at
dask groupby apply then merge back to dataframe
Dask rolling function by group syntax
Compute the rolling mean over the last n days in Dask
ValueError: Not all divisions are known, can't align partitions error on dask dataframe