16

I have a DataFrame that looks like follow:

userID, category, frequency
1,cat1,1
1,cat2,3
1,cat9,5
2,cat4,6
2,cat9,2
2,cat10,1
3,cat1,5
3,cat7,16
3,cat8,2

The number of distinct categories is 10, and I would like to create a feature vector for each userID and fill the missing categories with zeros.

So the output would be something like:

userID,feature
1,[1,3,0,0,0,0,0,0,5,0]
2,[0,0,0,6,0,0,0,0,2,1]
3,[5,0,0,0,0,0,16,2,0,0]

It is just an illustrative example, in reality I have about 200,000 unique userID and and 300 unique category.

What is the most efficient way to create the features DataFrame?

zero323
  • 322,348
  • 103
  • 959
  • 935
Rami
  • 8,044
  • 18
  • 66
  • 108
  • try to do that through the pivot operation, see example here https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html – Roman Zykov Feb 06 '17 at 12:37

3 Answers3

14

A little bit more DataFrame centric solution:

import org.apache.spark.ml.feature.VectorAssembler

val df = sc.parallelize(Seq(
  (1, "cat1", 1), (1, "cat2", 3), (1, "cat9", 5), (2, "cat4", 6),
  (2, "cat9", 2), (2, "cat10", 1), (3, "cat1", 5), (3, "cat7", 16),
  (3, "cat8", 2))).toDF("userID", "category", "frequency")

// Create a sorted array of categories
val categories = df
  .select($"category")
  .distinct.map(_.getString(0))
  .collect
  .sorted

// Prepare vector assemble
val assembler =  new VectorAssembler()
  .setInputCols(categories)
  .setOutputCol("features")

// Aggregation expressions
val exprs = categories.map(
   c => sum(when($"category" === c, $"frequency").otherwise(lit(0))).alias(c))

val transformed = assembler.transform(
    df.groupBy($"userID").agg(exprs.head, exprs.tail: _*))
  .select($"userID", $"features")

and an UDAF alternative:

import org.apache.spark.sql.expressions.{
  MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.types.{
  StructType, ArrayType, DoubleType, IntegerType}
import scala.collection.mutable.WrappedArray

class VectorAggregate (n: Int) extends UserDefinedAggregateFunction {
    def inputSchema = new StructType()
      .add("i", IntegerType)
      .add("v", DoubleType)
    def bufferSchema = new StructType().add("buff", ArrayType(DoubleType))
    def dataType = new VectorUDT()
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, Array.fill(n)(0.0))
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) {
        val i = input.getInt(0)
        val v = input.getDouble(1)
        val buff = buffer.getAs[WrappedArray[Double]](0) 
        buff(i) += v
        buffer.update(0, buff)
      }
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      val buff1 = buffer1.getAs[WrappedArray[Double]](0) 
      val buff2 = buffer2.getAs[WrappedArray[Double]](0) 
      for ((x, i) <- buff2.zipWithIndex) {
        buff1(i) += x
      }
      buffer1.update(0, buff1)
    }

    def evaluate(buffer: Row) =  Vectors.dense(
      buffer.getAs[Seq[Double]](0).toArray)
}

with example usage:

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("category_idx")
  .fit(df)

val indexed = indexer.transform(df)
  .withColumn("category_idx", $"category_idx".cast("integer"))
  .withColumn("frequency", $"frequency".cast("double"))

val n = indexer.labels.size + 1

val transformed = indexed
  .groupBy($"userID")
  .agg(new VectorAggregate(n)($"category_idx", $"frequency").as("vec"))

transformed.show

// +------+--------------------+
// |userID|                 vec|
// +------+--------------------+
// |     1|[1.0,5.0,0.0,3.0,...|
// |     2|[0.0,2.0,0.0,0.0,...|
// |     3|[5.0,0.0,16.0,0.0...|
// +------+--------------------+

In this case order of values is defined by indexer.labels:

indexer.labels
// Array[String] = Array(cat1, cat9, cat7, cat2, cat8, cat4, cat10)

In practice I would prefer solution by Odomontois so these are provided mostly for reference.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    Hi Zero323, I am trying your solution now, it seems to be slower and much more resource consuming than the solution of Odomontois. it ended by giving me a `java.lang.OutOfMemoryError: GC overhead limit exceeded` exception. – Rami Nov 24 '15 at 10:03
  • 1
    It can be. expensive. It converts data frame to wide format before aggregation so with large number of categories it means significant overhead. – zero323 Nov 24 '15 at 14:16
  • 2
    Cool. Reminds me dplyr & tidyr a little bit. But this API design looks strange for me. `.agg(exprs.head, exprs.tail: _*)` is kinda 5 times uglier than `.agg(exprs:_*)` And `c => sum(when($"category" === c, $"frequency").otherwise(lit(0))` is freaking all my LISPs out – Odomontois Nov 24 '15 at 14:33
  • 1
    @Odomontois Yeah it is ugly and confusing (`select` for example has pure varargs version). I've been thinking about opening a PR to add `agg(exprs: Column*)` version. – zero323 Nov 24 '15 at 14:47
  • @zero323 related to your first answer, I have asked this question: http://stackoverflow.com/q/43759896/1374804 – Rami May 03 '17 at 12:22
  • 1
    @Rami This is a tricky question. You can ignore the warning, but you cannot ignore the growing plan. With 20 columns I wouldn't worry. With 100s or 1000s things can get ugly. – zero323 May 03 '17 at 12:32
  • I guess we would loose the meta data for the encoding in this method or is it possible to attach it to the vector schema? – Roshini Sep 22 '17 at 14:53
13

Suppose:

val cs: SparkContext
val sc: SQLContext
val cats: DataFrame

Where userId and frequency are bigint columns which corresponds to scala.Long

We are creating intermediate mapping RDD:

val catMaps = cats.rdd
  .groupBy(_.getAs[Long]("userId"))
  .map { case (id, rows) => id -> rows
    .map { row => row.getAs[String]("category") -> row.getAs[Long]("frequency") }
    .toMap
  }

Then collecting all presented categories in the lexicographic order

val catNames = cs.broadcast(catMaps.map(_._2.keySet).reduce(_ union _).toArray.sorted)

Or creating it manually

val catNames = cs.broadcast(1 to 10 map {n => s"cat$n"} toArray)

Finally we're transforming maps to arrays with 0-values for non-existing values

import sc.implicits._
val catArrays = catMaps
      .map { case (id, catMap) => id -> catNames.value.map(catMap.getOrElse(_, 0L)) }
      .toDF("userId", "feature")

now catArrays.show() prints something like

+------+--------------------+
|userId|             feature|
+------+--------------------+
|     2|[0, 1, 0, 6, 0, 0...|
|     1|[1, 0, 3, 0, 0, 0...|
|     3|[5, 0, 0, 0, 16, ...|
+------+--------------------+

This could be not the most elegant solution for dataframes, as I barely familiar with this area of spark.

Note, that you could create your catNames manually to add zeros for missing cat3, cat5, ...

Also note that otherwise catMaps RDD is operated twice, you might want to .persist() it

Odomontois
  • 15,918
  • 2
  • 36
  • 71
7

Given your input:

val df = Seq((1, "cat1", 1), (1, "cat2", 3), (1, "cat9", 5), 
             (2, "cat4", 6), (2, "cat9", 2), (2, "cat10", 1), 
             (3, "cat1", 5), (3, "cat7", 16), (3, "cat8", 2))
           .toDF("userID", "category", "frequency")
df.show
+------+--------+---------+
|userID|category|frequency|
+------+--------+---------+
|     1|    cat1|        1|
|     1|    cat2|        3|
|     1|    cat9|        5|
|     2|    cat4|        6|
|     2|    cat9|        2|
|     2|   cat10|        1|
|     3|    cat1|        5|
|     3|    cat7|       16|
|     3|    cat8|        2|
+------+--------+---------+

Just run:

val pivoted = df.groupBy("userID").pivot("category").avg("frequency")
val dfZeros = pivoted.na.fill(0)
dzZeros.show    
+------+----+-----+----+----+----+----+----+                                    
|userID|cat1|cat10|cat2|cat4|cat7|cat8|cat9|
+------+----+-----+----+----+----+----+----+
|     1| 1.0|  0.0| 3.0| 0.0| 0.0| 0.0| 5.0|
|     3| 5.0|  0.0| 0.0| 0.0|16.0| 2.0| 0.0|
|     2| 0.0|  1.0| 0.0| 6.0| 0.0| 0.0| 2.0|
+------+----+-----+----+----+----+----+----+

Finally, use VectorAssembler to create a org.apache.spark.ml.linalg.Vector

NOTE: I have not checked performances on this yet...

EDIT: Possibly more complex, but likely more efficient!

def toSparseVectorUdf(size: Int) = udf[Vector, Seq[Row]] {
  (data: Seq[Row]) => {
    val indices = data.map(_.getDouble(0).toInt).toArray
    val values = data.map(_.getInt(1).toDouble).toArray
    Vectors.sparse(size, indices, values)
  }
}

val indexer = new StringIndexer().setInputCol("category").setOutputCol("idx")
val indexerModel = indexer.fit(df)
val totalCategories = indexerModel.labels.size
val dataWithIndices = indexerModel.transform(df)
val data = dataWithIndices.groupBy("userId").agg(sort_array(collect_list(struct($"idx", $"frequency".as("val")))).as("data"))
val dataWithFeatures = data.withColumn("features", toSparseVectorUdf(totalCategories)($"data")).drop("data")
dataWithFeatures.show(false)
+------+--------------------------+
|userId|features                  |
+------+--------------------------+
|1     |(7,[0,1,3],[1.0,5.0,3.0]) |
|3     |(7,[0,2,4],[5.0,16.0,2.0])|
|2     |(7,[1,5,6],[2.0,6.0,1.0]) |
+------+--------------------------+

NOTE: StringIndexer will sort categories by frequency => most frequent category will be at index=0 in indexerModel.labels. Feel free to use your own mapping if you'd like and pass that directly to toSparseVectorUdf.

Marsellus Wallace
  • 17,991
  • 25
  • 90
  • 154