2

I need to convert an RDD to a single column o.a.s.ml.linalg.Vector DataFrame, in order to use the ML algorithms, specifically K-Means for this case. This is my RDD:

val parsedData = sc.textFile("/digits480x.csv").map(s => Row(org.apache.spark.mllib.linalg.Vectors.dense(s.split(',').slice(0,64).map(_.toDouble))))

I tried doing what this answer suggests with no luck, I suppose because you end up with a MLlib Vector, it throws a mismatch error when running the algorithm. Now if I change this:

import org.apache.spark.mllib.linalg.{Vectors, VectorUDT}

val schema = new StructType()
  .add("features", new VectorUDT())

to this:

import org.apache.spark.ml.linalg.{Vectors, VectorUDT}

val parsedData = sc.textFile("/digits480x.csv").map(s => Row(org.apache.spark.ml.linalg.Vectors.dense(s.split(',').slice(0,64).map(_.toDouble))))

val schema = new StructType()
  .add("features", new VectorUDT())

I would get an error because ML VectorUDT is private.

I also tried converting the RDD as an array of doubles to Dataframe, and get the ML Dense Vector like this:

var parsedData = sc.textFile("/home/pililo/Documents/Mi_Memoria/Codigo/Datasets/Digits/digits480x.csv").map(s => Row(s.split(',').slice(0,64).map(_.toDouble)))

parsedData: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

val schema2 = new StructType().add("features", ArrayType(DoubleType))

schema2: org.apache.spark.sql.types.StructType = StructType(StructField(features,ArrayType(DoubleType,true),true))

val df = spark.createDataFrame(parsedData, schema2)

df: org.apache.spark.sql.DataFrame = [features: array<double>]

val df2 = df.map{ case Row(features: Array[Double]) => Row(org.apache.spark.ml.linalg.Vectors.dense(features)) }

Which throws the following error, even though spark.implicits._ is imported:

error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

Any help is greatly appreciated, thanks!

Community
  • 1
  • 1
Pilailou
  • 23
  • 4

1 Answers1

2

Out of the top of my head:

  1. Use csv source and VectorAssembler:

    import scala.util.Try
    import org.apache.spark.ml.linalg._
    import org.apache.spark.ml.feature.VectorAssembler
    
    val path: String = ???
    
    val n: Int = ???
    val m:Int = ???
    
    val raw = spark.read.csv(path)
    val featureCols = raw.columns.slice(n, m)
    
    val exprs = featureCols.map(c => col(c).cast("double"))
    val assembler = new VectorAssembler()
      .setInputCols(featureCols)
      .setOutputCol("features")
    
    assembler.transform(raw.select(exprs: _*)).select($"features")
    
  2. Use text source and UDF:

    def parse_(n: Int, m: Int)(s: String) = Try(
      Vectors.dense(s.split(',').slice(n, m).map(_.toDouble))
    ).toOption
    
    def parse(n: Int, m: Int) = udf(parse_(n, m) _)
    
    val raw = spark.read.text(path)
    
    raw.select(parse(n, m)(col(raw.columns.head)).alias("features"))
    
  3. Use text source and drop wrapping Row

    spark.read.text(path).as[String].map(parse_(n, m)).toDF
    
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Wow thanks for the answer, I'll give them a try. Any idea if 1) would be most efficient? I actually wanted to do something like that but was missing the way to slice the columns for input cols, since they're 64. Also I'd really appreciate it if you could explain this part in 1) if you can: exprs: _*. Is it like select all cols? Thanks a lot! – Pilailou Sep 02 '16 at 19:50
  • 2 and 3 can be slightly faster because there is no csv parsing involved but I wouldn't focus on that. 1. can be improved by providing schema to reader. Finally `_*` is all about [varargs](http://stackoverflow.com/q/1008783/1560062). It takes a sequence and "unpacks" it as arguments for select. – zero323 Sep 02 '16 at 20:04