1

I am working with panel data (i.e., a panel of IDs and time periods) in Dask and wish to resample the frequency from microseconds to 30 seconds. Sample data looks like this:

                            size     price       ID
datetime                                           
2018-09-26 13:50:00.000600   300   17.8185       AR
2018-09-26 13:50:00.004797    25   37.1165     BCOR
2018-09-26 13:50:00.005955   300   17.8185       AR
2018-09-26 13:50:00.006066   100   78.6200      XLI
2018-09-26 13:50:00.006862   100   73.0600      ABT
2018-09-26 13:50:00.007164   100   73.0600      ABT
2018-09-26 13:50:00.008643   100   73.3332      FAS
2018-09-26 13:50:00.008762   100   73.0600      ABT
2018-09-26 13:50:00.008793     2  114.4950     MSFT
2018-09-26 13:50:00.008978   100   20.6350      NWL

where ID is a string, datetime is the datetime object (currently set as the index), size is int64 and price is float64. I want to:

  1. groupby ID
  2. resample onto a 30-second frequency
  3. while aggregating price by its mean and aggregating size by its sum. Essentially, aggregate columns by different functions.

I understand that Dask doesn't support groupby-resample operations, but based on an excellent post here, it seems doable using a mix of dask and pandas.

My current attempt (based on the linked post above) is:

def per_group(blk):
    return blk.resample('30S').agg({blk['price']: np.mean, blk['size']: np.sum})

ddf.groupby('ID').apply(per_group, meta=ddf).compute() 

but it returns TypeError: 'Series' objects are mutable, thus they cannot be hashed. My sense is that it has something to do with the 'ID' column but I can't figure it out. I also tried supplying meta={'size': np.int64, 'price': np.float64, 'ID': 'object'} instead but to no avail.

Would love to see any other way this could be done more efficiently! Thanks.

AYA
  • 21
  • 6
  • In your `per_group` function, you are using series objects (dataframe columns) as dictionary keys in the `.agg()` method, which produces the error. The dictionary keys just need to be the [labels](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.agg.html). So just use `'price'` and `'size'` as dictionary keys. – AlexK May 24 '22 at 06:07
  • @AlexK, it returns the following error: ```ValueError: The columns in the computed data do not match the columns in the provided metadata Extra: [] Missing: ['ID'] ``` – AYA May 24 '22 at 06:15
  • For `meta=`, if you read the [documentation](https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.apply.html), the dataframe passed as argument needs to match the dtypes and column names of the output. – AlexK May 24 '22 at 06:21
  • Yup. But it is strange because ID should be returned as an index (since it is grouped-by) and not a column. I tried removing meta altogether and it works somehow (maybe it infers it automatically). Thanks for help! – AYA May 24 '22 at 06:23

1 Answers1

1

To use .resample the index should be a datetime (or another suitable dtype). One solution is to modify the aggregation function and add setting of the datetime index (another is to index by datetime in advance):

def per_group(df):
    return (
        df
        .set_index("datetime")
        .resample("30S")
        .agg({"price": "mean", "size": "mean"})
    )

ddf.groupby("ID").apply(per_group).compute()

The full reproducible snippet:

from io import StringIO

from dask.dataframe import from_pandas
from pandas import read_fwf, to_datetime

data = StringIO(
    """
datetime                    size     price       ID
2018-09-26 13:50:00.000600   300   17.8185       AR
2018-09-26 13:50:00.004797    25   37.1165     BCOR
2018-09-26 13:50:00.005955   300   17.8185       AR
2018-09-26 13:50:00.006066   100   78.6200      XLI
2018-09-26 13:50:00.006862   100   73.0600      ABT
2018-09-26 13:50:00.007164   100   73.0600      ABT
2018-09-26 13:50:00.008643   100   73.3332      FAS
2018-09-26 13:50:00.008762   100   73.0600      ABT
2018-09-26 13:50:00.008793     2  114.4950     MSFT
2018-09-26 13:50:00.008978   100   20.6350      NWL
"""
)

df = read_fwf(data)
df["datetime"] = df["datetime"] + " " + df["Unnamed: 1"]
df["datetime"] = to_datetime(df["datetime"])

ddf = from_pandas(df, npartitions=2)


def per_group(df):
    return (
        df.set_index("datetime").resample("30S").agg({"price": "mean", "size": "mean"})
    )


ddf.groupby("ID").apply(per_group).compute()
SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46