16

I want to use pyspark.mllib.stat.Statistics.corr function to compute correlation between two columns of pyspark.sql.dataframe.DataFrame object. corr function expects to take an rdd of Vectors objects. How do I translate a column of df['some_name'] to rdd of Vectors.dense object?

zero323
  • 322,348
  • 103
  • 959
  • 935
VJune
  • 1,195
  • 5
  • 16
  • 26

4 Answers4

30

There should be no need for that. For numerical you can compute correlation directly using DataFrameStatFunctions.corr:

df1 = sc.parallelize([(0.0, 1.0), (1.0, 0.0)]).toDF(["x", "y"])
df1.stat.corr("x", "y")
# -1.0

otherwise you can use VectorAssembler:

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=df.columns, outputCol="features")
assembler.transform(df).select("features").flatMap(lambda x: x)
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 3
    It supports only pearson. – VJune Jun 03 '16 at 16:20
  • Trying VectorAssembler, I first got ('DataFrame' object has no attribute 'flatMap'). Then appended .rdd to select('features'). Now getting (Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 48.0 failed 1 times, most recent failure), when collecting the returned RDD. – Vaibhav Jun 11 '19 at 19:52
  • The solution found here helped: https://stackoverflow.com/questions/51831874/how-to-get-correlation-matrix-values-pyspark – Vaibhav Jun 11 '19 at 20:39
  • this returns a matrix without columns names. Anyway to assign column names respectively? Also, can we convert it into some different form so it can be exported? I know if converted to pandas DF colum names can be provided but converting is slow :/ – gamer Jan 30 '20 at 17:39
5

df.stat.corr("column1","column2")

colidyre
  • 4,170
  • 12
  • 37
  • 53
MUK
  • 371
  • 4
  • 13
2
from pyspark.ml.stat import Correlation
from pyspark.ml.linalg import DenseMatrix, Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *

# Loading Data with more than 50 features
newdata = spark.read.csv("sample*.csv",inferSchema=True,header=True)

assembler = VectorAssembler(inputCols=newdata.columns, 
outputCol="features",handleInvalid='keep')
df = assembler.transform(newdata).select("features")

# correlation will be in Dense Matrix
correlation = Correlation.corr(df,"features","pearson").collect()[0][0]

# To convert Dense Matrix into DataFrame
rows = correlation.toArray().tolist()
df = spark.createDataFrame(rows,newdata.columns)
Dharman
  • 30,962
  • 25
  • 85
  • 135
Mohan
  • 61
  • 4
1

Ok I figured it out:

v1 = df.flatMap(lambda x: Vectors.dense(x[col_idx_1]))
v2 = df.flatMap(lambda x: Vectors.dense(x[col_idx_2])) 
VJune
  • 1,195
  • 5
  • 16
  • 26