2

I am new to Scala and I want to convert dataframe to rdd. let the label, features convert to RDD[labelPoint] for the input of MLlib. But I can't find out the way to deal with WrappedArray.

scala> test.printSchema
root
 |-- user_id: long (nullable = true)
 |-- brand_store_sn: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- money_score: double (nullable = true)
 |-- normal_score: double (nullable = true)
 |-- action_score: double (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- flag: string (nullable = true)
 |-- dt: string (nullable = true)


scala> test.head
res21: org.apache.spark.sql.Row = [2533,10005072,1,2.0,1.0,1.0,WrappedArray(["d90_pv_1sec:1.4471580313422192", "d3_pv_1sec:0.9030899869919435", "d7_pv_1sec:0.9030899869919435", "d30_pv_1sec:1.414973347970818", "d90_pv_week_decay:1.4235871662780681", "d1_pv_1sec:0.9030899869919435", "d120_pv_1sec:1.4471580313422192"]),user_positive,20161130]
Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
zhimin.feng
  • 161
  • 1
  • 13

1 Answers1

1

First - since LabeledPoint expects a Vector of Doubles, I'm assuming you also want to split each element in every features array by colon (:), and treat the right-hand side of it as the double, e.g.:

 "d90_pv_1sec:1.4471580313422192" --> 1.4471580313422192

If so - here's the transformation:

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint

// sample data - DataFrame with label, features and other columns
val df = Seq(
  (1, Array("d90_pv_1sec:1.4471580313422192", "d3_pv_1sec:0.9030899869919435"), 4.0),
  (2, Array("d7_pv_1sec:0.9030899869919435", "d30_pv_1sec:1.414973347970818"), 5.0)
).toDF("label", "features", "ignored")

// extract relevant fields from Row and convert WrappedArray[String] into Vector:
val result = df.rdd.map(r => {
  val label = r.getAs[Int]("label")
  val featuresArray = r.getAs[mutable.WrappedArray[String]]("features")
  val features: Vector = Vectors.dense(
    featuresArray.map(_.split(":")(1).toDouble).toArray
  )
  LabeledPoint(label, features)
})

result.foreach(println)
// (1.0,[1.4471580313422192,0.9030899869919435])
// (2.0,[0.9030899869919435,1.414973347970818])

EDIT: per clarification, now assuming each item in the input array contains the expected index in a resulting sparse vector:

"d90_pv_1sec:1.4471580313422192" --> index = 90; value = 1.4471580313422192

The modified code would be:

val vectorSize: Int = 100 // just a guess - should be the maximum index + 1

val result = df.rdd.map(r => {
  val label = r.getAs[Int]("label")
  val arr = r.getAs[mutable.WrappedArray[String]]("features").toArray
  // parse each item into (index, value) tuple to use in sparse vector
  val elements = arr.map(_.split(":")).map {
    case Array(s, d) => (s.replaceAll("d|_pv_1sec","").toInt, d.toDouble)
  }
  LabeledPoint(label, Vectors.sparse(vectorSize, elements))
})

result.foreach(println)
// (1.0,(100,[3,90],[0.9030899869919435,1.4471580313422192]))
// (2.0,(100,[7,30],[0.9030899869919435,1.414973347970818]))

NOTE: Using s.replaceAll("d|_pv_1sec","") might be a bit slow, as it compiles a regular expression for each item separately. If that's the case, it can be replaced by the faster (yet uglier) s.replace("d", "").replace("_pv_1sec", "") which doesn't use regular expressions.

Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • er...i want a label and a sparse feature vector format like: LabeledPoint(label, Vectors.sparse(3, Array("d90_pv_1sec", "d3_pv_1sec"), Array(1.4471580313422192, 0.9030899869919435))) – zhimin.feng Dec 30 '16 at 09:20
  • You can't - `LabeledPoint.features`has type `org.apache.spark.mllib.linalg.Vector`, which is necessarily a vector of `Doubles` and not Strings/Arrays - see https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala#L41 – Tzach Zohar Dec 30 '16 at 09:26
  • if I make the feature hashing,how can I make that format? casue the features is a sparse array . I want let it be : LabeledPoint(label, Vectors.sparse(3, Array(1,1.4471580313422192 ), Array(2, 0.9030899869919435),Array(3, 0.9030899869919435))) – zhimin.feng Dec 30 '16 at 09:42
  • I don't follow - are you saying that for input `d7_pv_1sec:0.9030899869919435` the resulting sparse vector should contain the value `0.9030899869919435` in *index* `7`? – Tzach Zohar Dec 30 '16 at 09:45
  • Yes,you got it. My Data Source is a sparse vector .so I want to keep the index and the value.I can hashing the index from string to interger. – zhimin.feng Dec 30 '16 at 09:52
  • See edit. You should really by **MUCH** clearer in your question - specify the exact expected output for a given example input - I can't guess your requirements. – Tzach Zohar Dec 30 '16 at 10:00
  • I make it by :LabeledPoint(label, Vectors.sparse(3,featuresArray.map(_.split(":")(0).toInt).toArray, featuresArray.map(_.split(":")(0).toDouble).toArray)) – zhimin.feng Dec 30 '16 at 10:00