3

i want to calculate a correlation matrix of a large dataset (1M rows). The idea is to calculate the correlation of product sales. If two products have a similar increase/decrease in sales year over year there might be a correlation.

I have already tried the posts here:

Which all more or less do the same but they collect the correlation matrix back at the driver. Which is a problem since the large dataset makes this collection RAM intense. I am looking for a way to break this problem into pieces and make use of the distributed computing of Spark. There are 170k unique products so the job runs 170k times and there are 29B combinations.

My idea is to calculate the correlation column by column (Cross-Apply) and then collect it in a data frame (or RDD) to run filters over it like (only with correlation > 0.8). But I have no good idea to start.

The Dataset basically looks like this.

d = {'Product': ['A', 'B', 'C','A', 'B', 'C','A', 'B', 'C'],\
     'Year': [2010, 2010, 2010, 2011, 2011, 2011, 2012, 2012, 2012],\
     'Revenue': [100, 200, 300, 110, 190, 320, 120, 220, 350]}
df = pd.DataFrame(data=d)

I transpose the data to have the years in columns.

df = df.pivot(index='Product', columns='Year', values='Revenue').fillna(0)

I calculate the pct_change to have the relative change year over year.

df_diff = df.pct_change(axis=1).replace([np.inf, -np.inf], np.nan).fillna(0)

Year     2010      2011      2012
Product                          
A         0.0  0.100000  0.090909
B         0.0 -0.050000  0.157895
C         0.0  0.066667  0.093750

And I would need the correlation... With pandas easy

# change structure
df_diff = df_diff.stack().unstack(level=0)
# get correlation
df_diff = df_diff.corr().abs()
# change structure back
df_diff = df_diff.unstack().to_frame(name='value')
df_diff.index = df_diff.index.set_names(['Product_1', 'Product_2'])
df_diff.reset_index(inplace=True)

    Product_1 Product_2     value
0         A         A  1.000000
1         A         B  0.207317
2         A         C  0.933485
3         B         A  0.207317
4         B         B  1.000000
5         B         C  0.544352
6         C         A  0.933485
7         C         B  0.544352
8         C         C  1.000000

2 Answers2

1

I used a udf and maped it against the spark df. With the numOfPartitions you can control the number of tasks that get generated and distributed to the worker nodes. In my example I used 16 nodes with 8 cpu each and split the df into 10000 partitions.

import pandas as pd
import numpy as np

d = {'Product': ['A', 'B', 'C','A', 'B', 'C','A', 'B', 'C'],\
     'Year': [2010, 2010, 2010, 2011, 2011, 2011, 2012, 2012, 2012],\
     'Revenue': [100, 200, 300, 110, 190, 320, 120, 220, 350]}
df = pd.DataFrame(data=d)

df = df.pivot(index='Product', columns='Year', values='Revenue').fillna(0)

df_diff = df.pct_change(axis=1, limit=1).replace([np.inf, -np.inf], np.nan).fillna(0)
df_diff = df_diff.dropna(how='all')

# pivot columns and rows to have year on rows and product on columns
df_diff_piv = df_diff.stack().unstack(level=0).sort_index()

# bring to spark df
df_diff_spark = spark.createDataFrame(df_diff.reset_index())

# correlate on at least x periods
correlation_min_periods = 1 # I used 10 for a 20 periods dataset

# set num of partitions to parallelize on tasks 
numOfPartitions = 200 #200 is default

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, FloatType

schema = StructType(
    [
    StructField("Product_1", StringType()),
    StructField("Product_2", StringType()),
    StructField("corr", StringType()) #cant get it to work on FloatType()
    ]
)

def calculate_correlation(product):
  data = df_diff_piv
  arr = []
  for col in data.columns:
    m1 = product
    m2 = data[col].name
    c = np.absolute(data[product].corr(data[col])) #, min_periods=correlation_min_periods
    arr.append([m1, m2, str(c)]) #cant get it to work on FloatType()
  return arr

#register udf
spark.udf.register("calculate_correlation_udf", calculate_correlation)
calculate_correlation_udf = udf(calculate_correlation, ArrayType(schema))

#apply udf to distinct product
distinct_product = df_diff_spark.select("Product").distinct().repartition(numOfPartitions)
res = distinct_product.select("Product", calculate_correlation_udf("Product").alias("corr_matrix"))

from pyspark.sql.functions import explode

# explode (flatten) array and struct back to dataframe
expl = res.select(explode("corr_matrix").alias("corr_row"))
rowlevel = expl.select("corr_row.Product_1","corr_row.Product_2","corr_row.corr")

# convert string to float
rowlevel = rowlevel.withColumn("corr", rowlevel["corr"].cast(FloatType()))

rowlevel.show()
  • The `calculate_correlation` function requires `df_diff_piv` which essentially has a similar size to the dataset. I am unsure how `spark` handles referring to variables outside the `UDF` definition, but if it is a big data frame, this will not work as it requires the whole data frame to be in memory. If the data frame is saved in some table that `spark` can access, you have to read from the table and collect the whole data frame into `pandas`. – tchar Jan 27 '23 at 00:28
0

So the following should work (at least for the toy example): I would be interested to hear how it scales:

import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# define pyspark df
d = {'Product': ['A', 'B', 'C','A', 'B', 'C','A', 'B', 'C'],\
     'Year': [2010, 2010, 2010, 2011, 2011, 2011, 2012, 2012, 2012],\
     'Revenue': [100, 200, 300, 110, 190, 320, 120, 220, 350]}
df = spark.createDataFrame(pd.DataFrame(data=d))

# Define a window over products and calc %-changes over time
win = Window().partitionBy("Product").orderBy("Year")
df = df.withColumn("pct_change", 
      F.col("Revenue")/F.lag(F.col("Revenue")).over(win) - 1
)

# replace nulls with 0
df = df.na.fill(0)

# pivot
df = (df.groupBy("Product")
    .pivot("Year")
    .agg(F.first("pct_change"))
    .orderBy("Product"))

# Get pair-RDD of (product, %-changes) and cross-join
numerical_cols = df.columns[1:]
rdd = df.rdd.map(lambda x: (x['Product'], [x[col] for col in numerical_cols]))
rdd = rdd.cartesian(rdd)

# correlation helper function
def corr(pair):
    (prod1, series1), (prod2, series2) = pair
    corr = pd.Series(series1).corr(pd.Series(series2))
    return (prod1, prod2, float(corr))

# pairwise correlation DF
corr_df = rdd.map(corr).toDF(schema=['Product_1', 'Product_2', 'value'])
corr_df.show(5)
ags29
  • 2,621
  • 1
  • 8
  • 14
  • thanks for your solution. It does not work on my cluster. It looks like it blows the stage. In the meantime I have a solution that works. Find below. – Gerhard Faller Mar 05 '20 at 14:07