0

Given two DataFrame columns of SparseVector objects, how can you add (i.e. vector addition) the two columns together to create new column?

Something like

df.columns
df: org.apache.spark.sql.DataFrame = [v1: SparseVector, v2: SparseVector]

df.withColumn("v3", ADD_COL_FUNCTION(col(v1), col(v2)))
kingledion
  • 2,263
  • 3
  • 25
  • 39

2 Answers2

0

There is no built in addition function for SparseVectors in Spark. DenseVector objects can be dealt with by turning them into arrays, but for a SparseVector this could be a memory killer. You can interpret SparseVectors as a map, then 'add' the maps together.

import org.apache.spark.mllib.linalg.{SparseVector, Vectors, Vector}

def addVecCols(v1: SparseVector, v2: SparseVector): Vector = {
  val map1: Map[Int, Double] = (v1.indices zip v1.values).toMap

  Vectors.sparse(v1. size, 
    (map1 ++ (v2.indices zip v2.values).toMap)
      .map{ case (k, v) => k -> (v + map1.getOrElse(k, 0d))}
      .toList
  )


val addVecUDF = udf((v1: SparseVector, v2: SparseVector) => addVecCols(v1, v2))

Note that in Spark 1.6, the return type of Vectors.sparse is Vector, while in Spark 2.X, it is SparseVector, so adjust the return type of addVecCols appropriately. Also, in 2.X, you can use the ml library instead of the mllib library.

Using this on a dataframe would be

val outDF = inDF.withColumn("added", addVecUDF(col("one_vector"), col("other_vector")))
kingledion
  • 2,263
  • 3
  • 25
  • 39
0

Here is our final solution to this problem.

First, we implemented the implicit conversions between Spark and Breeze vectors provided in this post (note the bug fixes in comments). This provides the asBreeze and fromBreeze conversions used in the code below.

Then we defined a function that allows addition of columns of sparse vectors:

def addVectors(v1Col: String, v2Col: String, outputCol: String)
            : DataFrame => DataFrame = {
  df: DataFrame => {
    def add(v1: SparkVector, v2: SparkVector): SparkVector =
      (v1.asBreeze + v2.asBreeze).fromBreeze
    val func = udf((v1: SparkVector, v2: SparkVector) => add(v1, v2))
    df.withColumn(outputCol, func(col(v1Col), col(v2Col)))
  }
}

This function is called using:

 df.transform(addVectors(col1Name, col2name, colOutName))

You may, of course, want to include some checks for presence of column names, and to make sure the output column doesn't over-write anything you don't want it to.

kingledion
  • 2,263
  • 3
  • 25
  • 39