1

After running a logistic regression algorithm on a dataset (n = 100 000), I would like to get a correlation matrix of the features.

Here is a preview of my data:

results.columns
res16: Array[String] = Array(label, Pclass, Sex, Age, SibSp, Parch, Fare, Embarked, SexIndex, EmbarkIndex, SexVec, EmbarkVec, features, rawPrediction, probability, prediction)
scala> val fts = results.select("features")
res19: org.apache.spark.sql.DataFrame = [features: vector]

scala> results.select("features").show(10)
+--------------------+
|            features|
+--------------------+
|[1.0,1.0,19.0,1.0...|
|[1.0,1.0,19.0,3.0...|
|[1.0,1.0,22.0,0.0...|
|[1.0,1.0,24.0,0.0...|
|[1.0,1.0,30.0,0.0...|
|[1.0,1.0,31.0,0.0...|
|[1.0,1.0,31.0,1.0...|
|[1.0,1.0,36.0,1.0...|
|(8,[0,1,2,6],[1.0...|
|[1.0,1.0,46.0,1.0...|

I know that in R I could use this code in order to get the correlation matrix:

res <- rcorr(as.matrix(my_data)) 

so I tried something similar with Scala:

val corrMatrix = corr(fts)

and got the following error:

<console>:64: error: overloaded method value corr with alternatives:
  (columnName1: String,columnName2: String)org.apache.spark.sql.Column <and>
  (column1: org.apache.spark.sql.Column,column2: org.apache.spark.sql.Column)org.apache.spark.sql.Column
 cannot be applied to (org.apache.spark.sql.DataFrame)

After looking into this error and reading this and this, I think I need to put these arrays into a DF and then iterate through the DF to find a correlation between each pair, i.e something like this pseudocode where (i,j) is an $i-th$ row and the j-th column:

for ( int i = 1, i <= n, i ++){
  for( int j = i, <= n, j ++ ){
    if( i == j) a(i)(j) = 1
    else  a(i)(j) = a(j)(i) = corr(i,j) // symmetric matrix 
    }
}

I am a complete beginner in Scala and Spark so I would really appreciate if someone could help me out.

Community
  • 1
  • 1
Lior
  • 147
  • 1
  • 10

2 Answers2

1

In Spark 2.0 or later you can:

import org.apache.spark.ml.linalg._
import org.apache.spark.sql.functions._
import spark.implicits._

val n: Int = ??? // number of features

val as_array = udf((v: Vector) => v.toArray)

val corrs = (0 to n).combinations(2).map {
  case Seq(i, j) => corr($"vs".getItem(i), $"vs".getItem(j))
}.toSeq

df.select(as_array($"features").alias("vs")).select(corrs: _*)
user7337271
  • 1,662
  • 1
  • 14
  • 23
  • I think I got it, i.e. I run it and I got the correlation but I have three more questions: how to put the coefficients into a symmetric matrix and have a nice output? my n = 7, shouldn't I run (1 to 7) or (0 to 6)? 'corr($"vs".getItem(i), $"vs".getItem(j))' - where does "vs" come from? – Lior Dec 28 '16 at 11:04
  • From the `select`. You can initialize empty matrix, collect result and fill the cells. – user7337271 Dec 28 '16 at 16:57
1

You will want to use the MLlib corr function on a RDD[org.apache.spark.mllib.linalg.Vector], here is how you get there:

Generating some data:

scala> import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import scala.util.Random
import scala.util.Random

scala> import scala.collection.immutable.{Vector => Std_Vec}
import scala.collection.immutable.{Vector=>Std_Vec}

scala> def randVec(n: Int): Std_Vec[Double] = Seq.fill(n)(Random.nextDouble).toVector
randVec: (n: Int)scala.collection.immutable.Vector[Double]

scala> val myDF = sc.parallelize((0 until 10).map(x => (x.toString, randVec(10)))).toDF("lab", "features")
myDF: org.apache.spark.sql.DataFrame = [lab: string, features: array<double>]

scala> myDF.show
+---+--------------------+
|lab|            features|
+---+--------------------+
|  0|[0.81916384524734...|
|  1|[0.22711488489859...|
|  2|[0.52918465208259...|
|  3|[0.29253172322411...|
|  4|[0.22417302941674...|
|  5|[0.21693234260391...|
|  6|[0.39854095726097...|
|  7|[0.58807946374202...|
|  8|[0.96849542301746...|
|  9|[0.93194455754124...|
+---+--------------------+

Transforming and running Statistics.corr:

scala> import spark.implicits._
import spark.implicits._

scala> val featureRDD = myDF.rdd.map{case Row(_, feat: Seq[Double]) => Vectors.dense(feat.toArray)}
featureRDD: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[31] at map at <console>:46

scala> import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.stat.Statistics

scala> Statistics.corr(featureRDD)
res10: org.apache.spark.mllib.linalg.Matrix =
1.0                   0.40745998071406825   ... (10 total)
0.40745998071406825   1.0                   ...
-0.08774724980258353  -0.40530111151726017  ...
0.01094426191127371   -0.2586807037180266   ...
0.39307374354852526   0.8309967336954839    ...
0.29758193455372256   0.5102634076586834    ...
0.15412639422865976   -0.07047908269724495  ...
-0.34671405612623457  0.13551628442995656   ...
0.296600595616234     -0.16362444756013478  ...
-0.13393787396551504  -0.42967054975951785  ...
evan.oman
  • 5,922
  • 22
  • 43
  • Could you please explain this line: `val featureRDD = myDF.rdd.map{case Row(_, feat: Seq[Double]) => Vectors.dense(feat.toArray)}` In my understanding, my code should look like this `val featureRDD = results.rdd.map{case Row(_, feat: Seq[Double]) => Vectors.dense(features.toArray)}` – Lior Dec 28 '16 at 10:45
  • I am pattern matching using the `Row` structure. Here `feat` is just a variable name standing in for the element in the second position of the `Row`. In your case I would do `results.select("features").rdd.map{case Row(feat: Seq[Double]) =>...` – evan.oman Dec 28 '16 at 15:37
  • ` val myRDD = results.select("features").rdd.map{case Row(feat: Seq[Double]) => Vectors.dense(feat.toArray)} ^ myRDD: org.apache.spark.rdd.RDD[org.apache.spark.ml.linalg.Vector] = MapPartitionsRDD[520] at map at :61 :64: error: type mismatch; found : org.apache.spark.rdd.RDD[org.apache.spark.ml.linalg.Vector] required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] Statistics.corr(myRDD)` ^ – Lior Dec 29 '16 at 12:03
  • You must have imported `ml` vectors instead of `mllib` vectors – evan.oman Dec 29 '16 at 12:58
  • I get the following error: `16/12/29 16:24:06 ERROR Executor: Exception in task 0.0 in stage 159.0 (TID 467) scala.MatchError: [0,Mobile_App,DE,TOP_10_pctile,121,FT,3P_only_instock,2,18,12.99,0.0,N,1.0,0.0,0.0,17.0,0.0,1.0,1.0,20.0,0.0,(3,[1],[1.0]),(5,[0],[1.0]),(15,[0],[1.0]),(52,[17],[1.0]),(25,[0],[1.0]),(6,[1],[1.0]),(71,[1],[1.0]),(4882,[20],[1.0]),(29,[0],[1.0]),(5090,[1,3,8,40,75,101,107,197,5059,5061],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,12.99,1.0]),[2.458951875703864,-2.458951875703864],[0.9212136245063555,0.07878637549364452],0.0]` – Lior Dec 29 '16 at 15:27
  • `at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)` and the error messages goes on – Lior Dec 29 '16 at 15:30
  • 'at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)' – Lior Dec 29 '16 at 15:34
  • Wait so your feature vector isn't a sequence of Doubles? Not sure how to compute the correlation matrix then – evan.oman Dec 29 '16 at 15:35
  • How would you expect the correlation to be calculated on strings or tuples? – evan.oman Dec 29 '16 at 15:43
  • Please take a look at the "features" column from my first post. I thought `corr` function takes care of converting the tuples into one single numerical value. How else could I then calculate the correlation between categorical data? – Lior Dec 30 '16 at 08:59
  • Uh, no, it can only use doubles (see [this example](http://pastebin.com/iyAJ0UFF)). Embedding objects as doubles is far from trivial and it is often impossible. If R can handle categorical data it is only because you converted them to factors which are actually integers. – evan.oman Dec 30 '16 at 16:39