0

I am computing an extremely large correlation matrix using PySpark:

 # corr_mat is a list of lists of floats, akin to a Numpy array
corr_mat = Statistics.corr(features, method="pearson")

However, in the next step, I want to convert this matrix into a dataframe with three columns - dimension A, dimension B, and the correlation between the two dimensions: basically go from a square N by N data object to (N^2) by 3 data object. I don't have a really good way I can think of to do this in Spark, and my current implementation is not parallelized:

    correlations = []
    for idx1, column1 in enumerate(col_names):
      for idx2, column2 in enumerate(col_names):
        if column1 == column2:
          continue  # skip correlations with itself

        correlation = corr_mat[idx1][idx2]
        correlations.append((column1, column2, correlation))

results = pd.DataFrame(correlations, columns=["interest_1", "interest_2", "correlation"])
spark_df_results = spark.createDataFrame(results)

Clearly this is nested for loop iteration is not ideal, especially with thousands of different dimensions I am correlating between.

In a traditional Pandas dataframe, this is typically an easy operation:

# unstack matrix into table
similarity_table = similarity_matrix.rename_axis(None).rename_axis(None, axis=1).stack().reset_index()

# rename columns
similarity_table.columns = ["word1", "word2", "similarity"]

Is there a similar strategy for unstacking the correlation matrix in PySpark?

Yu Chen
  • 6,540
  • 6
  • 51
  • 86

0 Answers0