1

My use case:

Read data from a MongoDB collection of the form:

{
    "_id" : ObjectId("582cab1b21650fc72055246d"),
    "label" : 167.517838916715,
    "features" : [ 
        10.0964787450654, 
        218.621137772497, 
        18.8833848806122, 
        11.8010251302327, 
        1.67037687829152, 
        22.0766170950477, 
        11.7122322171201, 
        12.8014773524475, 
        8.30441804118235, 
        29.4821268054137
    ]
}

And pass it to the org.apache.spark.ml.regression.LinearRegression class to create a model for predictions.

My problem:

The Spark connector reads in "features" as Array[Double].
LinearRegression.fit(...) expects a DataSet with a Label column and a Features column.
The Features column must be of type VectorUDT (so DenseVector or SparseVector will work).
I cannot .map features from Array[Double] to DenseVector because there is no relevant Encoder:

Error:(23, 11) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
  .map{case Row(label: Double, features: Array[Double]) => Row(label, Vectors.dense(features))}

Custom Encoders cannot be defined.

My question:

  • Is there a way I can set the configuration of the Spark connector to read in the "features" array as a Dense/SparseVector?
  • Is there any other way I can achieve this (without, for example, using an intermediary .csv file and loading that using libsvm)?

My code:

import com.mongodb.spark.MongoSpark
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.{Row, SparkSession}

case class DataPoint(label: Double, features: Array[Double])

object LinearRegressionWithMongo {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("LinearRegressionWithMongo")
      .master("local[4]")
      .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/LinearRegressionTest.DataPoints")
      .getOrCreate()

    import spark.implicits._

    val dataPoints = MongoSpark.load(spark)
      .map{case Row(label: Double, features: Array[Double]) => Row(label, Vectors.dense(features))}

    val splitData = dataPoints.randomSplit(Array(0.7, 0.3), 42)
    val training = splitData(0)
    val test = splitData(1)

    val linearRegression = new LinearRegression()
      .setLabelCol("label")
      .setFeaturesCol("features")
      .setRegParam(0.0)
      .setElasticNetParam(0.0)
      .setMaxIter(100)
      .setTol(1e-6)

    // Train the model
    val startTime = System.nanoTime()
    val linearRegressionModel = linearRegression.fit(training)
    val elapsedTime = (System.nanoTime() - startTime) / 1e9
    println(s"Training time: $elapsedTime seconds")

    // Print the weights and intercept for linear regression.
    println(s"Weights: ${linearRegressionModel.coefficients} Intercept: ${linearRegressionModel.intercept}")

    val modelEvaluator = new ModelEvaluator()
    println("Training data results:")
    modelEvaluator.evaluateRegressionModel(linearRegressionModel, training, "label")
    println("Test data results:")
    modelEvaluator.evaluateRegressionModel(linearRegressionModel, test, "label")

    spark.stop()
  }
}

Any help would be ridiculously appreciated!

Yax
  • 70
  • 8

1 Answers1

1

There is quick fix for this. If data has been loaded into a DataFrame called df which has:

  • id - SQL double.
  • features - SQL array<double>.

like this one

val df = Seq((1.0, Array(2.3, 3.4, 4.5))).toDF("id", "features")

you select columns you need for downstream processing:

val idAndFeatures = df.select("id", "features")

convert to statically typed Dataset:

val tuples = idAndFeatures.as[(Double, Seq[Double])]

map and convert back to Dataset[Row]:

val spark: SparkSession = ???

import spark.implicits._
import org.apache.spark.ml.linalg.Vectors

tuples.map { case (id, features) => 
  (id, Vectors.dense(features.toArray))
}.toDF("id", "features")

You can find a detailed explanation what is the difference compared to you current approach here.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935