I have been using this function for time series feature creation in Pandas that returns the (OLS?) best-fit slope of a given range of points:
def best_fit(X, Y):
xbar = sum(X)/len(X)
ybar = sum(Y)/len(Y)
n = len(X)
numer = sum([xi*yi for xi,yi in zip(X, Y)]) - n * xbar * ybar
denum = sum([xi**2 for xi in X]) - n * xbar**2
b = numer / denum
return b
Here is a simple example showing the results (see final df below):
import pandas as pd
import numpy as np
import random
cols = ['x_vals','y_vals']
df = pd.DataFrame(columns=cols)
for i in range(0,20):
df.loc[i,'x_vals'] = i
df.loc[i,'y_vals'] = 0.05 * i**2 + 0.1 * i + random.uniform(-1,1) #some random parabolic points
I then apply the best_fit function to get the slope of the preceding 5 points:
for i,row in df.iterrows():
if i>=5:
X = df['x_vals'][i-5:i]
Y = df['y_vals'][i-5:i]
df.loc[i,'slope'] = best_fit(X, Y)
df
Which gives me this:
x_vals y_vals slope
0 -0.648205 NaN
1 0.282729 NaN
2 0.785474 NaN
3 1.48546 NaN
4 0.408165 NaN
5 1.61244 0.331548
6 2.60868 0.228211
7 3.77621 0.377338
8 4.08937 0.678201
9 4.34625 0.952618
10 5.47554 0.694832
11 7.90902 0.630377
12 8.83912 0.965180
13 9.01195 1.306227
14 11.8244 1.269497
15 13.3199 1.380057
16 15.2751 1.380692
17 15.3959 1.717981
18 18.454 1.621861
19 20.0773 1.533528
I need to get the same slope column out of a pyspark dataframe instead of Pandas, only I am struggling to find a starting point on this (pyspark window?, OLS built-in function?, udf?).