3

I have a ragged (meaning not-a-regular frequency), time-indexed DataFrame, that I would like to perform a time-weighted rolling average on, that maintains the original index of the DataFrame. It is assumed that a recorded value is valid until superseded by another value. One way to achieve this is by just up-sampling the the ragged DataFrame to a uniform frequency and then do a rolling mean:

import pandas as pd
import numpy as np


def time_weighted_average_using_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
    # Leads to high memory usage
    original_index = df.index.copy()
    avg = (
        df.resample("1s")
        .ffill()
        .rolling(avg_window, closed="left", min_periods=int(avg_window[0])))
        .mean()
        .reindex(original_index)
    )
    return avg


if __name__ == "__main__":
    df = pd.DataFrame(
        {"A": [0, 1, 2, 3, 4, 5]},
        index=[
            pd.Timestamp("20130101 09:00:00"),
            pd.Timestamp("20130101 09:00:02"),
            pd.Timestamp("20130101 09:00:03"),
            pd.Timestamp("20130101 09:00:05"),
            pd.Timestamp("20130101 09:00:06"),
            pd.Timestamp("20130101 09:00:10"),
        ],
    )

    expected_avg = pd.DataFrame(
        {"A": [np.nan, np.nan, 1 / 3, 5 / 3, 7 / 3, 4]},
        index=[
            pd.Timestamp("20130101 09:00:00"),
            pd.Timestamp("20130101 09:00:02"),
            pd.Timestamp("20130101 09:00:03"),
            pd.Timestamp("20130101 09:00:05"),
            pd.Timestamp("20130101 09:00:06"),
            pd.Timestamp("20130101 09:00:10"),
        ],
    )

    pd.testing.assert_frame_equal(
        time_weighted_average_using_upsampling(df=df, avg_window="3s"), expected_avg
    )

The issue with this is that the up-sampling defeats the purpose of the sparse representation that the ragged df offers. A sparse representation is memory efficient, while the up-sampled version is not. This begs the question: How does one achieve the result shown above without having to up-sample the entire df?

Harald Husum
  • 1,059
  • 1
  • 7
  • 21
  • 1
    This is an interesting question, if your problem is to not upsample the the full dataframe at once, then maybe you can consider upsampling only the last 3 rows and do the calculation like `df.rolling(3, closed='left').apply(lambda x: x.resample("1s").ffill()[-4:-1].mean())` but this won't be really efficient I guess because you perform many upsampling at the end – Ben.T Aug 19 '20 at 12:20
  • I thought about that approach as well. It would deal with memory constraints, but there are practical challenges, one of which I don't see how to solve. `df.rolling().apply()` does not give the callable anything but an array of samples, hence you cannot use df operations. – Harald Husum Aug 19 '20 at 12:43
  • 1
    maybe I misunderstand when you say array, but `df.rolling().apply()` does pass a Series into it, not an numpy array now. the parameter [raw has been added in v0.23](https://pandas.pydata.org/pandas-docs/stable/whatsnew/v0.23.0.html#rolling-expanding-apply-accepts-raw-false-to-pass-a-series-to-the-function), it seems it was set to True by default in this earlier version, but since it seems that the default is [False](https://pandas.pydata.org/docs/reference/api/pandas.core.window.rolling.Rolling.apply.html) that passes a Serie, hence the index is available – Ben.T Aug 19 '20 at 13:00
  • You are absolutely right! My mistake. I used a slightly older version of pandas. – Harald Husum Aug 20 '20 at 06:28

2 Answers2

2

Here is an alternative, instead of upsampling the whole dataframe, you can first check where the time diff between 2 rows are bigger than the gap. then remove 3s to the rows with the gap and reindex df with union of these specific new timestamps. Once these rows created, you can groupby using where you added the new indexes, resample 1s per group and finally rolling with the method you did. Reindex with df at the end.

rule = 3
rolling_win = f'{rule}s'

sparse = df.index.to_series().diff().dt.total_seconds().ge(rule)
new_timestamps = df.index[sparse] - pd.Timedelta(seconds=rule)
print(new_timestamps) 
#DatetimeIndex(['2013-01-01 09:00:07'], dtype='datetime64[ns]', freq=None)

#reindex with the new 
df_ = df.reindex(df.index.union(new_timestamps))

#perform first the resample 1s per group, then clean the dataframe to do the rolling.mean
#finally reindex like original df
df_ = (df_.groupby(df_.index.isin(new_timestamps).cumsum())
          .resample("1s").ffill()
          .reset_index(level=0, drop=True).ffill()
          .rolling(rolling_win, closed="left", min_periods=rule)\
          .mean()
          .reindex(df.index)
      )
print(df_)
                            A
2013-01-01 09:00:00       NaN
2013-01-01 09:00:02       NaN
2013-01-01 09:00:03  0.333333
2013-01-01 09:00:05  1.666667
2013-01-01 09:00:06  2.333333
2013-01-01 09:00:10  4.000000

In this case it is not really interesting because the gap is actually small, but if the gap are huge, then it becomes useful.

EDIT or another option, probably better, union all the index made from the original index that you remove 1s, 2s, 3s,... (depending on the rule). Now you have only necessary index for the rolling so reindex, ffill and rolling.mean. Same result at the end

from functools import reduce

rule = 3
rolling_win = f'{rule}s'

idx = df.index
df_ = (df.reindex(reduce(lambda x, y: x.union(y), 
                         [idx - pd.Timedelta(seconds=i) 
                          for i in range(0, rule+1)]))
         .ffill()
         .rolling(rolling_win, closed="left", min_periods=rule)\
         .mean()
         .reindex(df.index)
        )
Ben.T
  • 29,160
  • 6
  • 32
  • 54
  • 1
    Another approach I developed based on your comments from yesterday: ```python def time_weighted_average_using_local_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame: original_index = df.index.copy() avg = ( df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill") .rolling(avg_window, closed="both", min_periods=2) .apply(lambda x: x.resample("1s").ffill()[:-1].mean(skipna=False)) .reindex(original_index) ) return avg ``` – Harald Husum Aug 20 '20 at 06:58
1

Two possible solutions inspired by @Ben.T:

def time_weighted_average_using_local_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
    """Uses second resolution up-sampling only on smaller windows at a time."""
    original_index = df.index.copy()
    avg = (
        df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
        .rolling(avg_window, closed="both", min_periods=2)
        .apply(lambda x: x.resample("1s").ffill()[:-1].mean(skipna=False))
        .reindex(original_index)
    )
    return avg


def time_weighted_average_using_index_weighting(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
    """Uses weighting by duration, by ensuring every window has a point at the start."""
    original_index = df.index.copy()
    avg = (
        df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
        .rolling(avg_window, closed="both", min_periods=2)
        .apply(lambda x: np.average(x[:-1], weights=x.index.to_series().diff()[1:].dt.seconds))
        .reindex(original_index)
    )
    return avg

The first one up-samples single rolling windows at a time, while the latter actually does ragged time weighted averaging, by ensuring that there is always a point available at the start of the windows we care about. This is done by including the original index shifted by the window length.

I have yet to measure performance on relevant cases.

EDIT: I decided to test the functions on a second resolution dataset with around 100,000 rows, and using 20 minute windows(!) Both variants were unbearably slow, but I think I have a new winner:

def time_weighted_average_using_index_weighting2(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
    """Uses weighting by duration, by ensuring every window has a point at the start."""
    original_index = df.index.copy()
    avg = df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
    avg = (
        avg.multiply(avg.index.to_series().diff().dt.seconds.shift(-1), axis=0)
        .divide(pd.Timedelta(avg_window).seconds)
        .rolling(avg_window, closed="left")
        .sum()
        .reindex(original_index)
    )
    avg[~((avg.index - pd.Timedelta(avg_window)) >= original_index[0])] = np.nan
    return avg

This one does the weighting up front, before rolling, and hence we get away with using .sum() instead of apply(). This translates to an enormous speed increase. We also get away with at most a doubling of indexes, regardless of the size of the averaging window.

Harald Husum
  • 1,059
  • 1
  • 7
  • 21