6

I have seen many stack overflow questions about similarity matrix but they deal with RDD or other cases and I could not find the direct answer to my problem and I decided to post a new question.

Problem

import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import functions as F, Window
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler,Normalizer
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

spark = pyspark.sql.SparkSession.builder.appName('app').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# pandas dataframe
pdf = pd.DataFrame({'user_id': ['user_0','user_1','user_2'],
                   'apple': [0,1,5],
                   'good banana': [3,0,1],
                   'carrot': [1,2,2]})
# spark dataframe
df = sqlContext.createDataFrame(pdf)
df.show()

+-------+-----+-----------+------+
|user_id|apple|good banana|carrot|
+-------+-----+-----------+------+
| user_0|    0|          3|     1|
| user_1|    1|          0|     2|
| user_2|    5|          1|     2|
+-------+-----+-----------+------+

Normalize and create Similarity Matrix using Pandas

from sklearn.preprocessing import normalize

pdf = pdf.set_index('user_id')
item_norm = normalize(pdf,axis=0) # normalize each items (NOT users)
item_sim = item_norm.T.dot(item_norm)
df_item_sim = pd.DataFrame(item_sim,index=pdf.columns,columns=pdf.columns)

                apple  good banana    carrot
apple        1.000000     0.310087  0.784465
good banana  0.310087     1.000000  0.527046
carrot       0.784465     0.527046  1.000000

Question: how to get the similarity matrix like above using PySpark?

I want to run KMeans on that data.

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# I want to do this...
model = KMeans(k=2, seed=1).fit(df.select('norm_features'))

df = model.transform(df)
df.show()

References

mck
  • 40,932
  • 13
  • 35
  • 50
BhishanPoudel
  • 15,974
  • 21
  • 108
  • 169

1 Answers1

6
import pyspark.sql.functions as F

df.show()
+-------+-----+-----------+------+
|user_id|apple|good banana|carrot|
+-------+-----+-----------+------+
| user_0|    0|          3|     1|
| user_1|    1|          0|     2|
| user_2|    5|          1|     2|
+-------+-----+-----------+------+

Swap rows and columns by unpivoting and pivoting:

df2 = df.selectExpr(
    'user_id',
    'stack(3, ' + ', '.join(["'%s', `%s`" % (c, c) for c in df.columns[1:]]) + ') as (fruit, items)'
).groupBy('fruit').pivot('user_id').agg(F.first('items'))

df2.show()
+-----------+------+------+------+
|      fruit|user_0|user_1|user_2|
+-----------+------+------+------+
|      apple|     0|     1|     5|
|good banana|     3|     0|     1|
|     carrot|     1|     2|     2|
+-----------+------+------+------+

Normalize:

df3 = df2.select(
    'fruit',
    *[
        (
            F.col(c) / 
            F.sqrt(
                sum([F.col(cc)*F.col(cc) for cc in df2.columns[1:]])
            )
        ).alias(c) for c in df2.columns[1:]
    ]
)

df3.show()
+-----------+------------------+-------------------+-------------------+
|      fruit|            user_0|             user_1|             user_2|
+-----------+------------------+-------------------+-------------------+
|      apple|               0.0|0.19611613513818404| 0.9805806756909202|
|good banana|0.9486832980505138|                0.0|0.31622776601683794|
|     carrot|0.3333333333333333| 0.6666666666666666| 0.6666666666666666|
+-----------+------------------+-------------------+-------------------+

Do the matrix multiplication:

df4 = (df3.alias('t1').repartition(10)
          .crossJoin(df3.alias('t2').repartition(10))
          .groupBy('t1.fruit')
          .pivot('t2.fruit', df.columns[1:])
          .agg(F.first(sum([F.col('t1.'+c) * F.col('t2.'+c) for c in df3.columns[1:]])))
      )
df4.show()
+-----------+-------------------+-------------------+------------------+
|      fruit|              apple|        good banana|            carrot|
+-----------+-------------------+-------------------+------------------+
|      apple| 1.0000000000000002|0.31008683647302115|0.7844645405527362|
|good banana|0.31008683647302115| 0.9999999999999999|0.5270462766947298|
|     carrot| 0.7844645405527362| 0.5270462766947298|               1.0|
+-----------+-------------------+-------------------+------------------+
mck
  • 40,932
  • 13
  • 35
  • 50
  • This is great and long way to do it. I was wondering if there was some vectorized way to matrix multiplication for example in `from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix` and other places. – BhishanPoudel Feb 27 '21 at 16:49
  • I tried this similar but was not able to implement for my case: https://stackoverflow.com/questions/46758768/calculating-the-cosine-similarity-between-all-the-rows-of-a-dataframe-in-pyspark – BhishanPoudel Feb 27 '21 at 16:53
  • @astro123 using dataframe cross join to do matrix multiplication is also parallelized because the computations for each row are done in parallel. I'd prefer using the dataframe API because the RDD-based MLLib is deprecated, and I'm not even sure whether there would be any performance improvement because of the overhead involved in converting the dataframe to RDD, matrix, and back. – mck Feb 27 '21 at 17:08