I have some data in a tab-delimited file on HDFS that looks like this:
label | user_id | feature
------------------------------
pos | 111 | www.abc.com
pos | 111 | www.xyz.com
pos | 111 | Firefox
pos | 222 | www.example.com
pos | 222 | www.xyz.com
pos | 222 | IE
neg | 333 | www.jkl.com
neg | 333 | www.xyz.com
neg | 333 | Chrome
I need to transform it to create a feature vector for each user_id to train a org.apache.spark.ml.classification.NaiveBayes
model.
My current approach is the essentially the following:
- Load the raw data into a DataFrame
- Index the features with StringIndexer
- Go down to the RDD and Group by user_id and map the feature indices into a sparse Vector.
The kicker is this... the data is already pre-sorted by user_id. What's the best way to take advantage of that? It pains me to think about how much needless work may be occurring.
In case a little code is helpful to understand my current approach, here is the essence of the map:
val featurization = (vals: (String,Iterable[Row])) => {
// create a Seq of all the feature indices
// Note: the indexing was done in a previous step not shown
val seq = vals._2.map(x => (x.getDouble(1).toInt,1.0D)).toSeq
// create the sparse vector
val featureVector = Vectors.sparse(maxIndex, seq)
// convert the string label into a Double
val label = if (vals._2.head.getString(2) == "pos") 1.0 else 0.0
(label, vals._1, featureVector)
}
d.rdd
.groupBy(_.getString(1))
.map(featurization)
.toDF("label","user_id","features")