0

With org.apache.spark.mllib learning algorithms, we used to set the pipeline without the training algorithm

var stages: Array[org.apache.spark.ml.PipelineStage] = index_transformers :+ assembler
val pipeline = new Pipeline().setStages(stages)

and then after we used LabeledPoint to get the data ready for the training algorithm and finally we used to train the model with something like

val model = GradientBoostedTrees.train(sc.parallelize(trainingData.collect()), boostingStrategy)

We must note that if we dun use "sc.parallelize" the training seems to never end.

Now with org.apache.spark.ml learning algorithms (due to setLabelCol & setFeaturesCol), we can include the training algorithm also in the pipeline

val model = new GBTRegressor().setLabelCol(target_col_name).setFeaturesCol("features").setMaxIter(10)

var stages: Array[org.apache.spark.ml.PipelineStage] = index_transformers :+ assembler :+ model
val pipeline = new Pipeline().setStages(stages)

But now when we pass the data, it excepts a data frame and not the datarows as done by sc.parallelize so the below code

val model = pipeline.fit(sc.parallelize(df_train))

throws the following error:

<console>:57: error: type mismatch;
 found   : org.apache.spark.sql.DataFrame
 required: Seq[?]

while this

val model = pipeline.fit(df_train)

never ends.

What is the solution to this problem ?

Abhishek
  • 3,337
  • 4
  • 32
  • 51

1 Answers1

0

The main problem with your code is that you are using the driver as a bridge of the data. i.e. you are collecting all distributed data to your driver and passing it back to all your nodes. The other problem is that you are actually using ML functionalities, this means that you have to use DataFrames instead of RDDs. Therefore, that you need to do is to convert your RDD into a DataFrame. Note that there are many ways to achieve this, you can check out How to convert RDD Object to DataFrame in Spark, another way is to use toDF method.

Community
  • 1
  • 1
Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93
  • ":type df_train" gives me "org.apache.spark.sql.DataFrame". And so it the error above say "df_train" is an DataFrame and not an RDD. Sorry If I am missing something here, can you help me a bit more? – Abhishek May 31 '16 at 15:19
  • @Abhishek yes, but when you `collect` it you transform it to a `Seq` and then to a `RDD`, after you parallelize it. – Alberto Bonsanto May 31 '16 at 15:22
  • Hey I got that, but I am still not sure how to fix this, can you help me with the code suggestion? Sorry I am new to Scala and ML algorithms. – Abhishek May 31 '16 at 15:26
  • I still believe you have not got my question, my doubt is about using sc.parallelize with ML algorithms which takes Dataframe as input while sc.parallelize get RDD as output. – Abhishek May 31 '16 at 19:26
  • Yes, but ML functions receive DataFrames and only them as arguments, therefore you have to convert your RDD into a DataFrame, maybe by using toDF. – Alberto Bonsanto May 31 '16 at 19:28
  • Yes sorry to bug you maybe my last try to get help from you. when I run "val model = pipeline.fit(df_train)" it does not end. And please not that "df_train" is a dataframe and not an RDD. Also the "pipeline" has ML algorithm also – Abhishek May 31 '16 at 19:31
  • @Abhishek how big is the data and where are you running it? – Alberto Bonsanto May 31 '16 at 19:33
  • It is a 10 MB file as of now to build the code. This later will be replaced with a big file. Regards, – Abhishek May 31 '16 at 19:35