0

I've been breaking my head about this one for a couple of days now. It feels like it should be intuitively easy... Really hope someone can help!

I've built an org.nd4j.linalg.api.ndarray.INDArray of word occurrence from some semi-structured data like this:

import org.nd4j.linalg.factory.Nd4j
import org.nd4s.Implicits._

val docMap = collection.mutable.Map[Int,Map[Int,Int]] //of the form Map(phrase -> Map(phrasePosition -> word)
val words = ArrayBuffer("word_1","word_2","word_3",..."word_n")
val windows = ArrayBuffer("$phrase,$phrasePosition_1","$phrase,$phrasePosition_2",..."$phrase,$phrasePosition_n") 

var matrix = Nd4j.create(windows.length*words.length).reshape(windows.length,words.length)
for (row <- matrix.shape(0)){
    for(column <- matrix.shape(1){
        //+1 to (row,column) if word occurs at phrase, phrasePosition indicated by window_n.
    }
}
val finalmatrix = matrix.T.dot(matrix) // to get co-occurrence matrix

So far so good...

Downstream of this point I need to integrate the data into an existing pipeline in Spark, and use that implementation of pca etc, so I need to create a DataFrame, or at least an RDD. If I knew the number of words and/or windows in advance I could do something like:

case class Row(window : String, word_1 : Double, word_2 : Double, ...etc)

val dfSeq = ArrayBuffer[Row]()
for (row <- matrix.shape(0)){
    dfSeq += Row(windows(row),matrix.get(NDArrayIndex.point(row), NDArrayIndex.all()))
}
sc.parallelize(dfSeq).toDF("window","word_1","word_2",...etc)

but the number of windows and words is determined at runtime. I'm looking for a WindowsxWords org.apache.spark.sql.DataFrame as output, input is a WindowsxWords org.nd4j.linalg.api.ndarray.INDArray

Thanks in advance for any help you can offer.

Nahko
  • 78
  • 10

1 Answers1

0

Ok, so after several days work it looks like the simple answer is: there isn't one. In fact, it looks like trying to use Nd4j in this context at all is a bad idea for several reasons:

  1. It's (really) hard to get data out of the native INDArray format once you've put it in.
  2. Even using something like guava, the .data() method brings everything on heap which will quickly become expensive.
  3. You've got the added hassle of having to compile an assembly jar or use hdfs etc to handle the library itself.

I did also consider using Breeze which may actually provide a viable solution but carries some of the same problems and can't be used on distributed data structures.

Unfortunately, using native Spark / Scala datatypes, although easier once you know how, is - for someone like me coming from Python + numpy + pandas heaven at least - painfully convoluted and ugly.

Nevertheless, I did implement this solution successfully:

import org.apache.spark.mllib.linalg.{Vectors,Vector,Matrix,DenseMatrix,DenseVector}
import org.apache.spark.mllib.linalg.distributed.RowMatrix

//first make a pseudo-matrix from Scala Array[Double]:
var rowSeq = Seq.fill(windows.length)(Array.fill(words.length)(0d))

//iterate through 'rows' and 'columns' to fill it:
for (row 0 until windows.length){
    for (column 0 until words.length){
        // rowSeq(row)(column) += 1 if word occurs at phrase, phrasePosition indicated by window_n.
    }
}

//create Spark DenseMatrix
val rows : Array[Double] = rowSeq.transpose.flatten.toArray
val matrix = new DenseMatrix(windows.length,words.length,rows)

One of the main operations that I needed Nd4J for was matrix.T.dot(matrix) but it turns out that you can't multiply 2 matrices of Type org.apache.spark.mllib.linalg.DenseMatrix together, one of them (A) has to be a org.apache.spark.mllib.linalg.distributed.RowMatrix and - you guessed it - you can't call matrix.transpose() on a RowMatrix, only on a DenseMatrix! Since it's not really relevant to the question, I'll leave that part out, except to explain that what comes out of that step is a RowMatrix. Credit is also due here and here for the final part of the solution:

val rowMatrix : [RowMatrix] = transposeAndDotDenseMatrix(matrix)

// get DataFrame from RowMatrix via DenseMatrix
val newdense = new DenseMatrix(rowMatrix.numRows().toInt,rowMatrix.numCols().toInt,rowMatrix.rows.collect.flatMap(x => x.toArray)) // the call to collect() here is undesirable...
val matrixRows = newdense.rowIter.toSeq.map(_.toArray)
val df = spark.sparkContext.parallelize(matrixRows).toDF("Rows")

// then separate columns:
val df2 = (0 until words.length).foldLeft(df)((df, num) => 
df.withColumn(words(num), $"Rows".getItem(num)))
.drop("Rows")

Would love to hear improvements and suggestions on this, thanks.

Nahko
  • 78
  • 10