3

I am writing an Apache Spark application using Scala. To handle and store data I use DataFrames. I have a nice pipeline with feature extraction and a MultiLayerPerceptron classifier, using the ML API.

I also want to use SVM (for comparison purposes). The thing is (and correct me if I am mistaken) only the MLLib provides SVM. And MLLib is not ready to handle DataFrames, only RDDs.

So I figured I can maintain the core of my application using DataFrames and to use SVM 1) I just convert the DataFrame's columns I need to an RDD[LabeledPoint] and 2) after the classification add the SVMs prediction to the DataFrame as a new column.

The first part I handled with a small function:

private def dataFrameToRDD(dataFrame : DataFrame) : RDD[LabeledPoint] = {
    val rddMl = dataFrame.select("label", "features").rdd.map(r => (r.getInt(0).toDouble, r.getAs[org.apache.spark.ml.linalg.SparseVector](1)))
    rddMl.map(r => new LabeledPoint(r._1, Vectors.dense(r._2.toArray)))
}

I have to specify and convert the type of vector since the feature extraction method uses ML API and not MLLib.

Then, this RDD[LabeledPoint] is fed to the SVM and classification goes smoothly, no issues. At the end and following spark's example I get an RDD[Double]:

val predictions = rdd.map(point => model.predict(point.features))

Now, I want to add the prediction score as column to the original DataFrame and return it. This is where I got stuck. I can convert the RDD[Double] to a DataFrame using

(sql context ommited)
import sqlContext.implicits._
val plDF = predictions.toDF("prediction")

But how do I join the two DataFrames where the second DataFrame becomes a column of the original one? I tried to use methods join and union but got SQL exceptions as the DataFrames have no equal columns to join or unite on.

EDIT I tried

data.withColumn("prediction", plDF.col("prediction"))

But I get an Analysis Exception :(

Camandros
  • 449
  • 6
  • 22
  • 1
    I've joined my predictions to my original frame using the solution from http://stackoverflow.com/questions/32882529/how-to-zip-twoor-more-dataframe-in-spark , is that similar to what you're looking for? – James Tobin Mar 29 '17 at 19:37
  • I see how it works, but transforming to RDD to join then create a dataframe? Seems more like a workaround. – Camandros Mar 29 '17 at 19:50

1 Answers1

0

I haven't figured out how to do it without recurring to RDDs, but anyway here's how I solved it with RDD. Added the rest of the code so that anyone can understand the complete logic. Any suggestions are appreciated.

package stuff

import java.util.logging.{Level, Logger}

import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}

/**
  * Created by camandros on 10-03-2017.
  */
class LinearSVMClassifier extends Classifier with Serializable{

  @transient lazy val log: Logger = Logger.getLogger(getClass.getName)

  private var model : SVMModel = _

  override def train(data : DataFrame): Unit = {
    val rdd = dataFrameToRDD(data)
    // Run training algorithm to build the model
    val numIter : Int = 100
    val step = Osint.properties(Osint.SVM_STEPSIZE).toDouble
    val c = Osint.properties(Osint.SVM_C).toDouble
    log.log(Level.INFO, "Initiating SVM training with parameters: C="+c+", step="+step)
    model = SVMWithSGD.train(rdd, numIterations = numIter, stepSize = step, regParam = c)
    log.log(Level.INFO, "Model training finished")

    // Clear the default threshold.
    model.clearThreshold()
  }

  override def classify(data : DataFrame): DataFrame = {
    log.log(Level.INFO, "Converting DataFrame to RDD")
    val rdd = dataFrameToRDD(data)
    log.log(Level.INFO, "Conversion finished; beginning classification")
    // Compute raw scores on the test set.
    val predictions = rdd.map(point => model.predict(point.features))
    log.log(Level.INFO, "Classification finished; Transforming RDD to DataFrame")

    val sqlContext : SQLContext = Osint.spark.sqlContext
    val tupleRDD = data.rdd.zip(predictions).map(t => Row.fromSeq(t._1.toSeq ++ Seq(t._2)))
    sqlContext.createDataFrame(tupleRDD, data.schema.add("predictions", "Double"))

    //TODO this should work it doesn't since this "withColumn" method seems to be applicable only to add
    // new columns using information from the same dataframe; therefore I am using the horrible rdd conversion
    //val sqlContext : SQLContext = Osint.spark.sqlContext
    //import sqlContext.implicits._
    //val plDF = predictions.toDF("predictions")
    //data.withColumn("prediction", plDF.col("predictions"))
  }

  private def dataFrameToRDD(dataFrame : DataFrame) : RDD[LabeledPoint] = {
    val rddMl = dataFrame.select("label", "features").rdd.map(r => (r.getInt(0).toDouble, r.getAs[org.apache.spark.ml.linalg.SparseVector](1)))
    rddMl.map(r => new LabeledPoint(r._1, Vectors.dense(r._2.toArray)))
  }
}
Camandros
  • 449
  • 6
  • 22