Grouping by 'ts_code' is just a trivial groupby() function. DataFrame.rolling() function is for single columns, so it's a tricky to apply it if you need data from multiple columns. You can use "from numpy_ext import rolling_apply as rolling_apply_ext" as in this example: Pandas rolling apply using multiple columns. However, I just created a function that manually groups the dataframe into n length sub-dataframes, then applies the function to calculate the value. idxmax() finds the index value of the peak of the low column, then we find the min() of the values that follow. The rest is pretty straightforward.
import numpy as np
import pandas as pd
df = pd.DataFrame([['A', 20, 10],
['A', 30, 5],
['A', 40, 20],
['A', 50, 10],
['A', 20, 30],
['B', 50, 10],
['B', 30, 5],
['B', 40, 20],
['B', 10, 10],
['B', 20, 30]],
columns=['ts_code', 'high', 'low']
)
def custom_f(df, n):
s = pd.Series(np.nan, index=df.index)
def sub_f(df_):
high_peak_idx = df_['high'].idxmax()
min_low_after_peak = df_.loc[high_peak_idx:]['low'].min()
max_high = df_['high'].max()
return 1 - min_low_after_peak / max_high
for i in range(df.shape[0] - n + 1):
df_ = df.iloc[i:i + n]
s.iloc[i + n - 1] = sub_f(df_)
return s
df['l3_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 3).values
df['l4_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 4).values
print(df)
If you prefer to use the rolling function, this method gives the same output:
def rolling_f(rolling_df):
df_ = df.loc[rolling_df.index]
high_peak_idx = df_['high'].idxmax()
min_low_after_peak = df_.loc[high_peak_idx:]["low"].min()
max_high = df_['high'].max()
return 1 - min_low_after_peak / max_high
df['l3_high_low_pct_chg'] = df.groupby("ts_code").rolling(3).apply(rolling_f).values[:, 0]
df['l4_high_low_pct_chg'] = df.groupby("ts_code").rolling(4).apply(rolling_f).values[:, 0]
print(df)
Finally, if you want to do a true rolling window calculation that avoids any index lookup, you can use the numpy_ext (https://pypi.org/project/numpy-ext/)
from numpy_ext import rolling_apply
def np_ext_f(rolling_df, n):
def rolling_apply_f(high, low):
return 1 - low[np.argmax(high):].min() / high.max()
try:
return pd.Series(rolling_apply(rolling_apply_f, n, rolling_df['high'].values, rolling_df['low'].values), index=rolling_df.index)
except ValueError:
return pd.Series(np.nan, index=rolling_df.index)
df['l3_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=3).sort_index(level=1).values
df['l4_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=4).sort_index(level=1).values
print(df)
output:
ts_code high low l3_high_low_pct_chg l4_high_low_pct_chg
0 A 20 10 NaN NaN
1 A 30 5 NaN NaN
2 A 40 20 0.50 NaN
3 A 50 10 0.80 0.80
4 A 20 30 0.80 0.80
5 B 50 10 NaN NaN
6 B 30 5 NaN NaN
7 B 40 20 0.90 NaN
8 B 10 10 0.75 0.90
9 B 20 30 0.75 0.75
For large datasets, the speed of these operations becomes an issue. So, to compare the speed of these different methods, I created a timing function:
import time
def timeit(f):
def timed(*args, **kw):
ts = time.time()
result = f(*args, **kw)
te = time.time()
print ('func:%r took: %2.4f sec' % \
(f.__name__, te-ts))
return result
return timed
Next, let's make a large DataFrame, just by copying the existing dataframe 500 times:
df = pd.concat([df for x in range(500)], axis=0)
df = df.reset_index()
Finally, we run the three tests under a timing function:
@timeit
def method_1():
df['l52_high_low_pct_chg'] = df.groupby("ts_code").apply(custom_f, 52).values
method_1()
@timeit
def method_2():
df['l52_high_low_pct_chg'] = df.groupby("ts_code").rolling(52).apply(rolling_f).values[:, 0]
method_2()
@timeit
def method_3():
df['l52_high_low_pct_chg'] = df.groupby('ts_code').apply(np_ext_f, n=52).sort_index(level=1).values
method_3()
Which gives us this output:
func:'method_1' took: 2.5650 sec
func:'method_2' took: 15.1233 sec
func:'method_3' took: 0.1084 sec
So, the fastest method is to use the numpy_ext, which makes sense because that's optimized for vectorized calculations. The second fastest method is the custom function I wrote, which is somewhat efficient because it does some vectorized calculations while also doing some Pandas lookups. The slowest method by far is using Pandas rolling function.