I am working with panel data (i.e., a panel of IDs and time periods) in Dask and wish to resample the frequency from microseconds to 30 seconds. Sample data looks like this:
size price ID
datetime
2018-09-26 13:50:00.000600 300 17.8185 AR
2018-09-26 13:50:00.004797 25 37.1165 BCOR
2018-09-26 13:50:00.005955 300 17.8185 AR
2018-09-26 13:50:00.006066 100 78.6200 XLI
2018-09-26 13:50:00.006862 100 73.0600 ABT
2018-09-26 13:50:00.007164 100 73.0600 ABT
2018-09-26 13:50:00.008643 100 73.3332 FAS
2018-09-26 13:50:00.008762 100 73.0600 ABT
2018-09-26 13:50:00.008793 2 114.4950 MSFT
2018-09-26 13:50:00.008978 100 20.6350 NWL
where ID is a string
, datetime is the datetime
object (currently set as the index), size is int64
and price is float64
. I want to:
- groupby
ID
- resample onto a 30-second frequency
- while aggregating price by its mean and aggregating size by its sum. Essentially, aggregate columns by different functions.
I understand that Dask doesn't support groupby-resample operations, but based on an excellent post here, it seems doable using a mix of dask and pandas.
My current attempt (based on the linked post above) is:
def per_group(blk):
return blk.resample('30S').agg({blk['price']: np.mean, blk['size']: np.sum})
ddf.groupby('ID').apply(per_group, meta=ddf).compute()
but it returns TypeError: 'Series' objects are mutable, thus they cannot be hashed
. My sense is that it has something to do with the 'ID' column but I can't figure it out. I also tried supplying meta={'size': np.int64, 'price': np.float64, 'ID': 'object'}
instead but to no avail.
Would love to see any other way this could be done more efficiently! Thanks.