There is no built in addition function for SparseVectors in Spark. DenseVector
objects can be dealt with by turning them into arrays, but for a SparseVector
this could be a memory killer. You can interpret SparseVectors as a map, then 'add' the maps together.
import org.apache.spark.mllib.linalg.{SparseVector, Vectors, Vector}
def addVecCols(v1: SparseVector, v2: SparseVector): Vector = {
val map1: Map[Int, Double] = (v1.indices zip v1.values).toMap
Vectors.sparse(v1. size,
(map1 ++ (v2.indices zip v2.values).toMap)
.map{ case (k, v) => k -> (v + map1.getOrElse(k, 0d))}
.toList
)
val addVecUDF = udf((v1: SparseVector, v2: SparseVector) => addVecCols(v1, v2))
Note that in Spark 1.6, the return type of Vectors.sparse
is Vector
, while in Spark 2.X, it is SparseVector
, so adjust the return type of addVecCols
appropriately. Also, in 2.X, you can use the ml
library instead of the mllib
library.
Using this on a dataframe would be
val outDF = inDF.withColumn("added", addVecUDF(col("one_vector"), col("other_vector")))