0

I'm trying to replicate the below pandas group by rolling mean logic in dask. But stuck at 1) how to specify time period in days and 2) how to assign it back into the original frame?

df['avg3d']=df.groupby('g')['v'].transform(lambda x: x.rolling('3D').mean())

Get errors like:

ValueError: index must be monotonic, ValueError: Not all divisions are known, can't align partitions or ValueError: cannot reindex from a duplicate axis

Full example

import pandas as pd
import dask.dataframe

df1 = pd.DataFrame({'g':['a']*10,'v':range(10)},index=pd.date_range('2020-01-01',periods=10))
df2=df1.copy()
df2['g']='b'
df = pd.concat([df1,df2]).sort_index()
df['avg3d']=df.groupby('g')['v'].transform(lambda x: x.rolling('3D').mean())

ddf = dask.dataframe.from_pandas(df, npartitions=4)
# works
ddf.groupby('g')['v'].apply(lambda x: x.rolling(3).mean(), meta=('avg3d', 'f8')).compute()

# rolling time period fails
ddf.groupby('g')['v'].apply(lambda x: x.rolling('3D').mean(), meta=('avg3d', 'f8')).compute()

# how do I add it to the rest of the data??
# neither of these work
ddf['avg3d']=ddf.groupby('g')['v'].apply(lambda x: x.rolling('3D').mean(), meta=('x', 'f8'))
ddf['avg3d']=ddf.groupby('g')['v'].transform(lambda x: x.rolling(3).mean(), meta=('x', 'f8'))
ddft = ddf.merge(ddf3d)
ddf.assign(avg3d=ddf.groupby('g')['v'].transform(lambda x: x.rolling(3).mean(), meta=('x', 'f8')))

Looked at

dask groupby apply then merge back to dataframe
Dask rolling function by group syntax
Compute the rolling mean over the last n days in Dask
ValueError: Not all divisions are known, can't align partitions error on dask dataframe

citynorman
  • 4,918
  • 3
  • 38
  • 39

1 Answers1

1

This problem arises due to the current implementation of .groupby in dask. The answer below is not a complete solution, but will hopefully explain why the error is happening.

First, let's make sure we get a true_result against which we can compare the dask results:

import dask.dataframe
import pandas as pd

df1 = pd.DataFrame(
    {"g": ["a"] * 10, "v": range(10)}, index=pd.date_range("2020-01-01", periods=10)
)
df = pd.concat([df1, df1.assign(g="b")]).sort_index()

df["avg3d"] = df.groupby("g")["v"].transform(lambda x: x.rolling("3D").mean())
true_result = df["avg3d"].array

Now, running the code that is commented with #works is going to generate different values every time, even though the data or computations do not have a source of randomness:

ddf = dask.dataframe.from_pandas(df, npartitions=4)
# this doesn't work
dask_result_1 = ddf.groupby("g")["v"].apply(
    lambda x: x.rolling(3).mean(), meta=("avg3d", "f8")
).compute().array

# this will fail, every time for a different reason
assert all(dask_result_1 == true_result)

Why is this happening? Well, under the hood, dask will want to shuffle data around to make sure that all the values of the groupby variable are in a single partition. This shuffling seems to be random, so when the values are stitched back together they can be out of original order.

So a quick way to fix this is to add sorting before rolling computation:

# rolling time period works
avg3d_dask = (
    ddf.groupby("g")["v"]
    .apply(lambda x: x.sort_index().rolling("3D").mean(), meta=("avg3d", "f8"))
    .compute()
    .droplevel(0)
    .sort_index()
)

# this will always pass
assert all(avg3d_dask == true_result)

Now, how do we add this to the original datafame? I don't know a simple way of doing this, but one of the hard ways would be to calculate partitions of the original dask dataframe and then split the data into appropriate chunks and assign. This approach however is not very robust (or at least requires a lot of use-case specific fine-tuning), so hopefully someone can provide a better solution for this part.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46