1

I have some data in a tab-delimited file on HDFS that looks like this:

label | user_id | feature
------------------------------
  pos | 111     | www.abc.com
  pos | 111     | www.xyz.com
  pos | 111     | Firefox
  pos | 222     | www.example.com
  pos | 222     | www.xyz.com
  pos | 222     | IE
  neg | 333     | www.jkl.com
  neg | 333     | www.xyz.com
  neg | 333     | Chrome

I need to transform it to create a feature vector for each user_id to train a org.apache.spark.ml.classification.NaiveBayes model.

My current approach is the essentially the following:

  1. Load the raw data into a DataFrame
  2. Index the features with StringIndexer
  3. Go down to the RDD and Group by user_id and map the feature indices into a sparse Vector.

The kicker is this... the data is already pre-sorted by user_id. What's the best way to take advantage of that? It pains me to think about how much needless work may be occurring.

In case a little code is helpful to understand my current approach, here is the essence of the map:

val featurization = (vals: (String,Iterable[Row])) => {
  // create a Seq of all the feature indices
  // Note: the indexing was done in a previous step not shown
  val seq = vals._2.map(x => (x.getDouble(1).toInt,1.0D)).toSeq

  // create the sparse vector
  val featureVector = Vectors.sparse(maxIndex, seq)

  // convert the string label into a Double
  val label = if (vals._2.head.getString(2) == "pos") 1.0 else 0.0

  (label, vals._1, featureVector)
}

d.rdd
  .groupBy(_.getString(1))
  .map(featurization)
  .toDF("label","user_id","features")
zero323
  • 322,348
  • 103
  • 959
  • 935
Larsenal
  • 49,878
  • 43
  • 152
  • 220

1 Answers1

1

Lets start with your other question

If my data on disk is guaranteed to be pre-sorted by the key which will be used for a group aggregation or reduce, is there any way for Spark to take advantage of that?

It depends. If operation you apply can benefit from map-side aggregation then you can gain quite a lot by having presorted data without any further intervention in your code. Data sharing the same key should located on the same partitions and can be aggregated locally before shuffle.

Unfortunately it won't help much in this particular scenario. Even if you enable map side aggregation (groupBy(Key) doesn't use is so you'll need custom implementation) or aggregate over feature vectors (you'll find some examples in my answer to How to define a custom aggregation function to sum a column of Vectors?) there is not much to gain. You can save some work here and there but you still have to transfer all indices between nodes.

If you want to gain more you'll have to do a little bit more work. I can see two basic ways you can leverage existing order:

  1. Use custom Hadoop input format to yield only complete records (label, id, all features) instead of reading data line by line. If your data has fixed number of lines per id you could even try to use NLineInputFormat and apply mapPartitions to aggregate records afterwards.

    This is definitely more verbose solution but requires no additional shuffling in Spark.

  2. Read data as usual but use custom partitioner for groupBy. As far as I can tell using rangePartitioner should work just fine but to be sure you can try following procedure:

    • use mapPartitionsWithIndex to find minimum / maximum id per partition.
    • create partitioner which keeps minimum <= ids < maximum on the current (i-th) partition and pushes maximum to the partition i + 1
    • use this partitioner for groupBy(Key)

    It is probably more friendly solution but requires at least some shuffling. If expected number of records to move is low (<< #records-per-partition) you can even handle this without shuffle using mapPartitions and broadcast* although having partitioned can be more useful and cheaper to get in practice.


* You can use an approach similar to this: https://stackoverflow.com/a/33072089/1560062

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935