1

I'm pretty new to Scala and Spark and I'm not able to create a correlation matrix from a file of ratings. It's similar to this question but I have sparse data in the matrix form. My data looks like this:

<user-id>, <rating-for-movie-1-or-null>, ... <rating-for-movie-n-or-null>

123, , , 3, , 4.5
456, 1, 2, 3, , 4
...

The code that is most promising so far looks like this:

val corTest = sc.textFile("data/collab_filter_data.txt").map(_.split(","))
Statistics.corr(corTest, "pearson")

(I know the user_ids in there are a defect, but I'm willing to live with that for the moment)

I'm expecting output like:

1,   .123, .345
.123, 1,   .454
.345, .454, 1

It's a matrix showing how each user is correlated to every other user. Graphically, it would be a correlogram.

It's a total noob problem but I've been fighting with it for a few hours and can't seem to Google my way out of it.

Community
  • 1
  • 1
brycemcd
  • 4,343
  • 3
  • 26
  • 29
  • You can easily remove the first element, the one containing the userid, with `_.split(",").drop(1)` – Paul Oct 13 '15 at 21:29

1 Answers1

4

I believe this code should accomplish what you want:

import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.linalg._
...
val corTest = input.map { case (line: String) => 
  val split = line.split(",").drop(1)
  split.map(elem => if (elem.trim.isEmpty) 0.0 else elem.toDouble)
}.map(arr => Vectors.dense(arr))

val corrMatrix = Statistics.corr(corTest)

Here, we are mapping your input into a String array, dropping the user id element, zeroing out your whitespace, and finally creating a dense vector from the resultant array. Also, note that Pearson's method is used by default if no method is supplied.

When run in shell with some examples, I see the following:

scala> val input = sc.parallelize(Array("123, , , 3, , 4.5", "456, 1, 2, 3, , 4", "789, 4, 2.5, , 0.5, 4", "000, 5, 3.5, , 4.5, "))
input: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:16

scala> val corTest = ...
corTest: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[20] at map at <console>:18

scala> val corrMatrix = Statistics.corr(corTest)
...
corrMatrix: org.apache.spark.mllib.linalg.Matrix =
1.0                  0.9037378388935388   -0.9701425001453317  ... (5 total)
0.9037378388935388   1.0                  -0.7844645405527361  ...
-0.9701425001453317  -0.7844645405527361  1.0                  ...
0.7709910794438823   0.7273340668525836   -0.6622661785325219  ...
-0.7513578452729373  -0.7560667258329613  0.6195855517393626   ...
Rohan Aletty
  • 2,432
  • 1
  • 14
  • 20
  • really helpful. Thanks a lot. I was getting an index out of bounds error when parsing my real life CSV. I added `.padTo(100, 0.0)` to the end of `split.map(elem => if (elem.isEmpty) 0.0 else elem.toDouble)` and now I'm getting the result I want. – brycemcd Oct 14 '15 at 16:30