7

I have to compute a cosine distance between each rows but I have no idea how to do it using Spark API Dataframes elegantly. The idea is to compute similarities for each rows(items) and take top 10 similarities by comparing their similarities between rows. --> This is need for Item-Item Recommender System.

All that I've read about it is referred to computing similarity over columns Apache Spark Python Cosine Similarity over DataFrames May someone say is it possible to compute a cosine distance elegantly between rows using PySpark Dataframe's API or RDD's or I have to do it manually?

That's just some code to show what I intend to do

def cosineSimilarity(vec1, vec2):
    return vec1.dot(vec2) / (LA.norm(vec1) * LA.norm(vec2))


#p.s model is ALS
Pred_Factors = model.itemFactors.cache() #Pred_Factors = DataFrame[id: int, features: array<float>]

sims = []

for _id,_feature in Pred_Factors.toLocalIterator():
    for id, feature in Pred_Factors.toLocalIterator():

        itemFactor = _feature

        sims = sims.append(_id, cosineSimilarity(asarray(feature),itemFactor))

sims = sc.parallelize(l)
sortedSims = sims.takeOrdered(10, key=lambda x: -x[1])

Thanks in Advance for all the help

Ivan Shelonik
  • 1,958
  • 5
  • 25
  • 49

1 Answers1

5

You can use mllib.feature.IndexedRowMatrix's columnSimilarities function. It uses cosine metrics as distance function. It computes similarities between columns so, you have to take transpose before applying this function.

pred_ = IndexedRowMatrix(Pred_Factors.rdd.map(lambda x: IndexedRow(x[0],x[1]))).toBlockMatrix().transpose().toIndexedRowMatrix()
pred_sims = pred.columnSimilarities()
Shaido
  • 27,497
  • 23
  • 70
  • 73
pauli
  • 4,191
  • 2
  • 25
  • 41
  • how where you able to use this to get the top 10 similar items? @IvanShelonik – Reub Feb 07 '18 at 15:49
  • can this be used to any df? – Reub Feb 07 '18 at 17:40
  • 2
    you have to convert your dataframe to rdd to apply this method. – pauli Feb 08 '18 at 05:02
  • the transformations in RDD reduce drastically the number of partitions, how can i make this more efficient for parallelization? calling repartition on the underlying rdd of the matrixes means i than have to convert it again to a rowMatrix..is there any other way? – nonoDa Sep 22 '21 at 13:17