I'm working on some elementary matrix operations in Scala for Spark. I keep getting memory and heap errors that I don't expect and don't understand.
I'm trying to implement a method that will multiply a matrix by a vector and then subtract another vector. I'm looking at matrices on the order of 5000 rows and 1,000,000 columns. The matrix is sparse, but the vectors are dense, and the data types are doubles. I'm using Scala 2.10.4 and Spark 1.4.1, though there's nothing important about those versions if something else would work better.
import org.apache.spark.mllib.linalg.{DenseVector, Vector => SparkVector}
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
// "Axmy" means A times x minus y
def Axmy(A: IndexedRowMatrix, x: SparkVector, y: SparkVector): SparkVector = {
require(A.numCols == x.size)
require(A.numRows == y.size)
val p: RDD[Double] = A.rows.map(r => {
r.vector.toArray.zip(x.toArray).map({ case (aj, xj) => aj + xj })
.sum - y(r.index.toInt)
})
new DenseVector(p.collect)
}
When this didn't work I thought about it some more and realized that all 1,000,000 elements of a row needed to be in memory at once and this might be causing the problem. I had seen another example that worked on the transpose of the matrix, so I tried implementing my own version:
//ATxmy means Atranspose times x minus y
def ATxmy(AT: IndexedRowMatrix, x: SparkVector, y: SparkVector): SparkVector {
require(AT.numRows == x.size)
require(AT.numCols == y.size)
val p1: RDD[(Long, SparkVector)] = AT.rows.map(r => (r.index, r.vector))
p1.persist(StorageLevel.MEMORY_AND_DISK)
val p2: RDD[((Long, SparkVector), Double)] = p1.map({ case (ri, r) => ((ri, r), x(ri.toInt)) })
p2.persist(StorageLevel.MEMORY_AND_DISK)
val p3: RDD[Array[Double]] = p2.map({ case ((ri, r), v) => r.toArray.map(e => e * v) })
p3.persist(StorageLevel.MEMORY_AND_DISK)
val p4: Array[Double] = p3.reduce((a1, a2) => a1.zip(a2).map({ case (ae1, ae2) => ae1 + ae2 }))
val p5: Array[Double] = p4.zip(y.toArray).map({ case (axme, ye) => axme - ye })
new DenseVector(p5)
}
Without the calls to persist, I got various memory exhaustion errors, both on my laptop using spark.master=local[*] and on my company's three-node cluster with driver and executor memory set to 5G. The most recent was a "org.apache.spark.shuffle.FetchFailedException: Direct buffer memory" message. With the calls to persist, it seems to run longer but eventually hits a "IllegalArgumentException: Size exceeds Integer.MAX_VALUE" message in sun.nio.ch.FileChannelImpl.map, which seems to be related to the partition size.
Spark Java Error: Size exceeds Integer.MAX_VALUE
With an 8-byte double, a 5000-element array should only take up 40 Kb in memory. Obviously Spark is keeping many rows of the transposed matrix in memory at once, which is causing the memory depletion problems, so how do I encourage Spark to swap out memory and disk when needed to complete the computation? For now I'm looking for something that will work and that can be optimized later.
Thanks in advance!