0

I presently have a Spark Dataframe with 2 columns: 1) a column where each row contains a vector of predictive features 2) a column containing the value to be predicted.

To discern the most predictive features for use in a later model, I am using backwards elimination by P-value, as outlined by this article. Below is my code:

num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()
for i in range(0, num_vars):
    model = LinearRegression(featuresCol="filtered_features", labelCol="averageScore")
    model = model.fit(scoresDf)
    p_values = model.summary.pValues
    max_p = np.max(p_values)
    if max_p > 0.05:
        max_index = p_values.index(max_p)
        drop_max_index_udf = udf(lambda elem, drop_index, var_count:
                                 Vectors.dense([elem[j] for j in range(var_count) if j not in [drop_index]]), VectorUDT())
        scoresDfs = scoresDf.withColumn("filtered_features", drop_max_index_udf(scoresDf["filtered_features"],
                                                                               lit(max_index), lit(num_vars)))
        num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()

The code runs, but the only problem is that every iteration takes drastically longer than the last. Based on the answer to this question, it appears that the code is re-evaluating all prior iterations every time.

Ideally, I would like to feed the entire logic into some Pipeline structure that would store it all lazily and then execute sequentially with no repeats when called upon, but I am unsure as to whether that is even possible given that none of Spark's estimator / transformer functions seem to fit this use case.

Any guidance would be appreciated, thanks!

Michael
  • 343
  • 3
  • 13

2 Answers2

0

You are creating the model repeatedly inside a loop. It is a time consuming process and needs to be done once per training data set and a set of parameters. Try the following -


num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()
modelAlgo = LinearRegression(featuresCol="filtered_features", labelCol="averageScore")
model = modelAlgo.fit(scoresDf)

for i in range(0, num_vars):

    p_values = model.summary.pValues
    max_p = np.max(p_values)
    if max_p > 0.05:
        max_index = p_values.index(max_p)
        drop_max_index_udf = udf(lambda elem, drop_index, var_count:
                                 Vectors.dense([elem[j] for j in range(var_count) if j not in [drop_index]]), VectorUDT())
        scoresDfs = scoresDf.withColumn("filtered_features", drop_max_index_udf(scoresDf["filtered_features"],
                                                                               lit(max_index), lit(num_vars)))
        num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()

Once you are happy with the model you save it. When you need to evaluate your data just read this model and predict with it.

Salim
  • 2,046
  • 12
  • 13
  • Thanks for the swift response Salim! The only caveat is that I have to re-fit the model every time the "filtered_features" column is updated, and then use the new p values in the for loop. The above solution re-uses the p-values from the first fitting for the entire loop. Is there a way to efficiently re-fit the model in every iteration? – Michael Jan 06 '20 at 11:54
  • It will take time for iteratively training the model. – Salim Jan 06 '20 at 15:15
  • But is there a way for the logic to not have to re-compute every prior iteration of scoresDf and the regression in every new iteration? Because that seems to be the reason every iteration is longer than the prior. – Michael Jan 06 '20 at 17:17
0

why are you doing

model = model.fit(scoresDf)

when scoredDfs contains your new df with one less independent variable?


If you change your code with the following:

independent_vars = ['x0', 'x1', 'x2', 'x3', 'x4']

def remove_element(array, index):
    return Vectors.dense(np.delete(array, index, 0))

remove_element_udf = udf(lambda a, i: remove_element(a, i), VectorUDT())

max_p = 1
i = 0
while (max_p > 0.05):
    model = LinearRegression(featuresCol="filtered_features", 
                             labelCol="averageScore",
                             fitIntercept=False)
    model = model.fit(scoresDf)

    print('iteration: ', i)
    summary = model.summary
    summary_df = pd.DataFrame({
        'var': independent_vars,
        'coeff': model.coefficients,
        'se': summary.coefficientStandardErrors,
        'p_value': summary.pValues
    })
    print(summary_df)
    print("r2: %f" % summary.r2)    

    p_values = summary.pValues
    max_p = np.max(p_values)
    if max_p > 0.05:
        max_index = p_values.index(max_p)
        max_var = independent_vars[max_index]
        print('-> max_index {max_index}, corresponding to var {var}'.format(max_index=max_index, var=max_var))
        scoresDf = scoresDf.withColumn("filtered_features", remove_element_udf(scoresDf["filtered_features"],
                                                                               lit(max_index)))
        independent_vars = np.delete(independent_vars, max_index, 0)

    print()
    i += 1

you will get

iteration:  0
  var     coeff        se   p_value
0  x0  0.174697  0.207794  0.402616
1  x1 -0.448982  0.203421  0.029712
2  x2 -0.452940  0.233972  0.055856
3  x3 -3.213578  0.209935  0.000000
4  x4  3.790730  0.212917  0.000000
r2: 0.870330
-> max_index 0, corresponding to var x0

iteration:  1
  var     coeff        se   p_value
0  x1 -0.431835  0.202087  0.035150
1  x2 -0.460711  0.233432  0.051297
2  x3 -3.218725  0.209525  0.000000
3  x4  3.768661  0.210970  0.000000
r2: 0.869365
-> max_index 1, corresponding to var x2

iteration:  2
  var     coeff        se   p_value
0  x1 -0.479803  0.203592  0.020449
1  x3 -3.344830  0.202501  0.000000
2  x4  3.669419  0.207925  0.000000
r2: 0.864065

in first and second iteration, two independent variables with p-value greater than 0.05 are removed

SchwarzeHuhn
  • 638
  • 5
  • 17
  • Thanks for the response! The method only removes the index with the highest p-value in each iteration, regardless of how many of them are > 0.05 – Michael Jan 06 '20 at 17:14
  • The variable with highest p-value is going to be the same at each iteration, given that the fitted df is not changing. I believe your code is not doing what it was meant for (backward elimination article) – SchwarzeHuhn Jan 07 '20 at 19:06