I am computing an extremely large correlation matrix using PySpark:
# corr_mat is a list of lists of floats, akin to a Numpy array
corr_mat = Statistics.corr(features, method="pearson")
However, in the next step, I want to convert this matrix into a dataframe with three columns - dimension A, dimension B, and the correlation between the two dimensions: basically go from a square N by N data object to (N^2) by 3 data object. I don't have a really good way I can think of to do this in Spark, and my current implementation is not parallelized:
correlations = []
for idx1, column1 in enumerate(col_names):
for idx2, column2 in enumerate(col_names):
if column1 == column2:
continue # skip correlations with itself
correlation = corr_mat[idx1][idx2]
correlations.append((column1, column2, correlation))
results = pd.DataFrame(correlations, columns=["interest_1", "interest_2", "correlation"])
spark_df_results = spark.createDataFrame(results)
Clearly this is nested for loop iteration is not ideal, especially with thousands of different dimensions I am correlating between.
In a traditional Pandas dataframe, this is typically an easy operation:
# unstack matrix into table
similarity_table = similarity_matrix.rename_axis(None).rename_axis(None, axis=1).stack().reset_index()
# rename columns
similarity_table.columns = ["word1", "word2", "similarity"]
Is there a similar strategy for unstacking the correlation matrix in PySpark?