0

I have been using this function for time series feature creation in Pandas that returns the (OLS?) best-fit slope of a given range of points:

def best_fit(X, Y):
    xbar = sum(X)/len(X)
    ybar = sum(Y)/len(Y)
    n = len(X) 
    numer = sum([xi*yi for xi,yi in zip(X, Y)]) - n * xbar * ybar
    denum = sum([xi**2 for xi in X]) - n * xbar**2
    b = numer / denum
    return b

Here is a simple example showing the results (see final df below):

import pandas as pd
import numpy as np
import random
cols = ['x_vals','y_vals']
df = pd.DataFrame(columns=cols)
for i in range(0,20):
  df.loc[i,'x_vals'] = i
  df.loc[i,'y_vals'] = 0.05 * i**2 + 0.1 * i + random.uniform(-1,1) #some random parabolic points

I then apply the best_fit function to get the slope of the preceding 5 points:

for i,row in df.iterrows():
  if i>=5:
    X = df['x_vals'][i-5:i]
    Y = df['y_vals'][i-5:i]
    df.loc[i,'slope'] = best_fit(X, Y)
df

Which gives me this:

x_vals  y_vals  slope
0   -0.648205   NaN
1   0.282729    NaN
2   0.785474    NaN
3   1.48546     NaN
4   0.408165    NaN
5   1.61244     0.331548
6   2.60868     0.228211
7   3.77621     0.377338
8   4.08937     0.678201
9   4.34625     0.952618
10  5.47554     0.694832
11  7.90902     0.630377
12  8.83912     0.965180
13  9.01195     1.306227
14  11.8244     1.269497
15  13.3199     1.380057
16  15.2751     1.380692
17  15.3959     1.717981
18  18.454      1.621861
19  20.0773     1.533528

I need to get the same slope column out of a pyspark dataframe instead of Pandas, only I am struggling to find a starting point on this (pyspark window?, OLS built-in function?, udf?).

GivenX
  • 495
  • 1
  • 8
  • 17
  • It should be combination of using pyspark window and udf, This ref (based on scala) will help you to implement https://stackoverflow.com/questions/23402303/apache-spark-moving-average – Ranga Vure Mar 19 '19 at 07:45

2 Answers2

1

Use Pyspark window, collect the previous 5 col values as list and call the best_fit_udf

#moodified this function to handle 0 division and size of elements
def best_fit(X, Y):
    xbar = sum(X)/len(X)
    ybar = sum(Y)/len(Y)
    n = len(X)
    if n < 6 :
       return None
    numer = sum([xi*yi for xi,yi in zip(X, Y)]) - n * xbar * ybar
    denum = sum([xi**2 for xi in X]) - n * xbar**2
    if denum == 0:
       return None
    else:
       return numer / denum

best_fit_udf = udf(best_fit, DoubleType())

cols = ['x_vals','y_vals']
df = pd.DataFrame(columns=cols)
for i in range(0,20):
  df.loc[i,'x_vals'] = i
  df.loc[i,'y_vals'] = 0.05 * i**2 + 0.1 * i + random.uniform(-1,1) #some random parabolic points

spark_df = spark.createDataFrame(df)

w = Window.orderBy("x_vals").rowsBetween(-5, 0)

df = spark_df.select("x_vals","y_vals",(F.collect_list('x_vals')).over(w).alias("x_list"), (F.collect_list('y_vals')).over(w).alias("y_list"))

df.withColumn("slope", best_fit_udf('x_list','y_list') ).drop('x_list','y_list').show()

which gives me this

+------+--------------------+------------------+
|x_vals|              y_vals|             slope|
+------+--------------------+------------------+
|     0|-0.05626232194330516|              null|
|     1|  1.0626613654187942|              null|
|     2|-0.18870622421238525|              null|
|     3|  1.7106172105001147|              null|
|     4|  1.9398571272258158|              null|
|     5|  2.3632022124308474| 0.475092382628695|
|     6|  1.7264493731921893|0.3201115790149247|
|     7|   3.298712278452215|0.5116552596172641|
|     8|  4.3179382280764305|0.4707547914949186|
|     9|    4.00691449276564|0.5077645079970263|
|    10|   6.085792506183289|0.7563877936316236|
|    11|   7.272669055040746|1.0223232959178614|
|    12|    8.70598472345308| 1.085126649123283|
|    13|  10.141576882812515|1.2686365861314373|
|    14|  11.170519757896672| 1.411962717827295|
|    15|  11.999868557507794|1.2199864149871311|
|    16|   14.86294824152797|1.3960568659909833|
|    17|  16.698964370210007| 1.570238888844051|
|    18|   18.71951724368806|1.7810890092953742|
|    19|  20.428078271618062|1.9509358501665701|
+------+--------------------+------------------+
Ranga Vure
  • 1,922
  • 3
  • 16
  • 23
0

Thanks @Ranga Vure. I tested your function against the original best_fit function (with your values of course) and I get different values for the slope. This is what the best_fit function gives (the y_vals values look rounded but they are not):

x_vals  y_vals      slope_py
0       -0.0562623  NaN
1       1.06266     NaN
2       -0.188706   NaN
3       1.71062     NaN
4       1.93986     NaN
5       2.3632      0.464019
6       1.72645     0.472965
7       3.29871     0.448290
8       4.31794     0.296278
9       4.00691     0.569167
10      6.08579     0.587891
11      7.27267     0.942689
12      8.70598     0.971577
13      10.1416     1.204185
14      11.1705     1.488952
15      11.9999     1.303836
16      14.8629     1.191893
17      16.699      1.417222
18      18.7195     1.680720
19      20.4281     1.979709

I translated the best_fit function into sqlContext and that gave me the same values as the original best_fit function:

spark_df.createOrReplaceTempView('tempsql')
df_sql = sqlContext.sql("""
SELECT *,
(((sum(x_vals*y_vals) over (order by x_vals rows between 5 preceding and 1 preceding)) - 5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5 * (sum(y_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5) /
((sum(x_vals*x_vals) over (order by x_vals rows between 5 preceding and 1 preceding)) - 5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5)) as slope_sql
from tempsql 
""")

This gives the same values as the original best_fit function, apart from points 3-5 as it calculates the slope before the intended start (i.e. the 6th point) - a small quirk but one that I can live with:

+------+-------------------+-------------------+--------------------+
|x_vals|             y_vals|           slope_py|           slope_sql|
+------+-------------------+-------------------+--------------------+
|     0|-0.0562623219433051|                NaN|                null|
|     1|   1.06266136541879|                NaN|                null|
|     2| -0.188706224212385|                NaN|  1.0767269459046163|
|     3|   1.71061721050011|                NaN|0.060822882948800006|
|     4|   1.93985712722581|                NaN|  0.4092836048203674|
|     5|   2.36320221243084| 0.4640194743419549|  0.4640194743419549|
|     6|   1.72644937319218| 0.4729645045462295|  0.4729645045462295|
|     7|   3.29871227845221|0.44828961967398656|  0.4482896196739862|
|     8|   4.31793822807643| 0.2962782381870575| 0.29627823818705823|
|     9|   4.00691449276564|  0.569167226772261|  0.5691672267722595|
GivenX
  • 495
  • 1
  • 8
  • 17
  • @Ranga Vure Ah, the discrepancy in the slope outputs is due to the fact that your rowsBetween includes the current row, whereas I would like it to exclude the current row (i.e. up to but not including the observation point). If I modify the original function to include the current row I get your numbers, hence my accepting your answer, thanks! How do I exclude the current row, if I do .rowsBetween(-5, -1), it falls over. – GivenX Mar 20 '19 at 10:12