With org.apache.spark.mllib learning algorithms, we used to set the pipeline without the training algorithm
var stages: Array[org.apache.spark.ml.PipelineStage] = index_transformers :+ assembler
val pipeline = new Pipeline().setStages(stages)
and then after we used LabeledPoint to get the data ready for the training algorithm and finally we used to train the model with something like
val model = GradientBoostedTrees.train(sc.parallelize(trainingData.collect()), boostingStrategy)
We must note that if we dun use "sc.parallelize" the training seems to never end.
Now with org.apache.spark.ml learning algorithms (due to setLabelCol & setFeaturesCol), we can include the training algorithm also in the pipeline
val model = new GBTRegressor().setLabelCol(target_col_name).setFeaturesCol("features").setMaxIter(10)
var stages: Array[org.apache.spark.ml.PipelineStage] = index_transformers :+ assembler :+ model
val pipeline = new Pipeline().setStages(stages)
But now when we pass the data, it excepts a data frame and not the datarows as done by sc.parallelize so the below code
val model = pipeline.fit(sc.parallelize(df_train))
throws the following error:
<console>:57: error: type mismatch;
found : org.apache.spark.sql.DataFrame
required: Seq[?]
while this
val model = pipeline.fit(df_train)
never ends.
What is the solution to this problem ?