5

I am trying to develop my own feedforward nerual network using spark. However I can't find the operations like multiplication , add or divide in the spark's sparse vector. The document said it is implemented using breeze vector. But I can find add operations in breeze but not in spark vector. How to solve this problem?

Till Rohrmann
  • 13,148
  • 1
  • 25
  • 51
hidemyname
  • 3,791
  • 7
  • 27
  • 41

1 Answers1

6

Spark's Vector implementations don't support algebraic operations. Unfortunately, the Spark API no longer supports to convert SparkVectors into BreezeVectors via the methods asBreeze and fromBreeze, because these methods have been made package private with respect to the spark package.

However, you can write your own Spark to Breeze converter. The following code defines such a converter using type classes which allow you to obtain always the most specific type.

import breeze.linalg.{Vector => BreezeVector, DenseVector => DenseBreezeVector, SparseVector => SparseBreezeVector}
import org.apache.spark.mllib.linalg.{Vector => SparkVector, DenseVector => DenseSparkVector, SparseVector => SparseSparkVector}

package object myPackage {

  implicit class RichSparkVector[I <: SparkVector](vector: I) {
    def asBreeze[O <: BreezeVector[Double]](implicit converter: Spark2BreezeConverter[I, O]): O = {
      converter.convert(vector)
    }
  }

  implicit class RichBreezeVector[I <: BreezeVector[Double]](breezeVector: I) {
    def fromBreeze[O <: SparkVector](implicit converter: Breeze2SparkConverter[I, O]): O = {
      converter.convert(breezeVector)
    }
  }
}

trait Spark2BreezeConverter[I <: SparkVector, O <: BreezeVector[Double]] {
  def convert(sparkVector: I): O
}

object Spark2BreezeConverter {
  implicit val denseSpark2DenseBreezeConverter = new Spark2BreezeConverter[DenseSparkVector, DenseBreezeVector[Double]] {
    override def convert(sparkVector: DenseSparkVector): DenseBreezeVector[Double] = {
      new DenseBreezeVector[Double](sparkVector.values)
    }
  }

  implicit val sparkSpark2SparseBreezeConverter = new Spark2BreezeConverter[SparseSparkVector, SparseBreezeVector[Double]] {
    override def convert(sparkVector: SparseSparkVector): SparseBreezeVector[Double] = {
      new SparseBreezeVector[Double](sparkVector.indices, sparkVector.values, sparkVector.size)
    }
  }

  implicit val defaultSpark2BreezeConverter = new Spark2BreezeConverter[SparkVector, BreezeVector[Double]] {
    override def convert(sparkVector: SparkVector): BreezeVector[Double] = {
      sparkVector match {
        case dv: DenseSparkVector => denseSpark2DenseBreezeConverter.convert(dv)
        case sv: SparseSparkVector => sparkSpark2SparseBreezeConverter.convert(sv)
      }
    }
  }
}

trait Breeze2SparkConverter[I <: BreezeVector[Double], O <: SparkVector] {
  def convert(breezeVector: I): O
}

object Breeze2SparkConverter {
  implicit val denseBreeze2DenseSparkVector = new Breeze2SparkConverter[DenseBreezeVector[Double], DenseSparkVector] {
    override def convert(breezeVector: DenseBreezeVector[Double]): DenseSparkVector = {
      new DenseSparkVector(breezeVector.data)
    }
  }

  implicit val sparseBreeze2SparseSparkVector = new Breeze2SparkConverter[SparseBreezeVector[Double], SparseSparkVector] {
    override def convert(breezeVector: SparseBreezeVector[Double]): SparseSparkVector = {
      val size = breezeVector.activeSize
      val indices = breezeVector.array.index.take(size)
      val data = breezeVector.data.take(size)
      new SparseSparkVector(size, indices, data)
    }
  }

  implicit val defaultBreeze2SparkVector = new Breeze2SparkConverter[BreezeVector[Double], SparkVector] {
    override def convert(breezeVector: BreezeVector[Double]): SparkVector = {
      breezeVector match {
        case dv: DenseBreezeVector[Double] => denseBreeze2DenseSparkVector.convert(dv)
        case sv: SparseBreezeVector[Double] => sparseBreeze2SparseSparkVector.convert(sv)
      }
    }
  }
}
Till Rohrmann
  • 13,148
  • 1
  • 25
  • 51
  • But I found pyspark supports this operation. I am thinking use python instead. I think this setting is pretty weird. When we want to customize our own ML lib, we can not take advantage of spark. – hidemyname Sep 08 '15 at 15:05
  • 1
    Thats a shame Spark does not provide basic conversions as it renders their Sparse/DenseVectors useless. – Michel Lemay Nov 01 '16 at 13:22
  • 1
    There is a bug in your proposal. In `sparseBreeze2SparseSparkVector`, you pass `size` = `breezeVector.activeSize` to the Spark `SparseVector` constructor. For a sparse matrix with length 100 but 10 elements, this is 10; but you want to pass 100. Instead, the constructor should read `new SparseSparkVector(breezeVector.length, indices, data)`. Also, one of your functions is named `sparkSpark2SparseBreezeConverter` instead of `sparseSpark2SparseBreezeConverter`. – kingledion Sep 26 '18 at 00:19
  • 1
    That being said, with those bug fixes, the sparse converters in this post just passed QA and went into production, so thank you for the excellent answer. – kingledion Sep 26 '18 at 00:20