I presently have a Spark Dataframe with 2 columns: 1) a column where each row contains a vector of predictive features 2) a column containing the value to be predicted.
To discern the most predictive features for use in a later model, I am using backwards elimination by P-value, as outlined by this article. Below is my code:
num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()
for i in range(0, num_vars):
model = LinearRegression(featuresCol="filtered_features", labelCol="averageScore")
model = model.fit(scoresDf)
p_values = model.summary.pValues
max_p = np.max(p_values)
if max_p > 0.05:
max_index = p_values.index(max_p)
drop_max_index_udf = udf(lambda elem, drop_index, var_count:
Vectors.dense([elem[j] for j in range(var_count) if j not in [drop_index]]), VectorUDT())
scoresDfs = scoresDf.withColumn("filtered_features", drop_max_index_udf(scoresDf["filtered_features"],
lit(max_index), lit(num_vars)))
num_vars = scoresDf.select("filtered_features").take(1)[0][0].__len__()
The code runs, but the only problem is that every iteration takes drastically longer than the last. Based on the answer to this question, it appears that the code is re-evaluating all prior iterations every time.
Ideally, I would like to feed the entire logic into some Pipeline structure that would store it all lazily and then execute sequentially with no repeats when called upon, but I am unsure as to whether that is even possible given that none of Spark's estimator / transformer functions seem to fit this use case.
Any guidance would be appreciated, thanks!