3

I have a table of around 50k distinct rows, and 2 columns. You can think of each row being a movie, and columns being the attributes of that movie - "ID": id of that movie, "Tags":some content tags of the movie, in form of a list of strings for each movie.

Data looks something like this:

movie_1, ['romantic','comedy','English']; movie_2, ['action','kongfu','Chinese']

My goal is to first calculate the jacquard similarity between each Movie based on their corresponding tags, and once that's done, I will be able to know for each movie (for example I choose movie_1), what are the other top 5 most similar movies as with this one (movie_1 in this case). And I want the top 5 results not only for movie_1 itself, but to get the top 5 for all of the movies.

I have tried using Python to solve the problem, however the run time is a big challenge here. Even when I used multiprocessing, running on 6 cores, the total run time still lasted over 20 hours.

Python code below:

import pandas as pd
from collections import Counter
import numpy as np
from multiprocessing import Pool
import time

col_names=['movie_id','tag_name']
df=pd.read_csv("movies.csv",names=col_names)
movie_ids=df['movie_id'].tolist()
tag_list=df['tag_name'].tolist()

def jaccard_similarity(string1, string2):
    intersection = set(string1).intersection(set(string2))
    union = set(string1).union(set(string2))
    return len(intersection)/float(len(union))

def jc_results(movie_id):
    result=Counter()
    this_index=movie_ids.index(movie_id)
    for another_id in movie_ids:
        that_index=movie_ids.index(another_id)
        if another_id==movie_id:
            continue
        else:
            tag_1=tag_list[this_index]
            tag_2=tag_list[that_index]
            jaccard = jaccard_similarity(tag_1,tag_2)
            result[(movie_id,another_id)]=jaccard
    return result.most_common(10)


from multiprocessing import Pool
pool=Pool(6)
results={}
for movie_id in movie_ids:
    results[movie_id]=pool.apply_async(jc_results,args=(movie_id,))
pool.close()
pool.join()
for movie_id, res in results.items():
    results[movie_id] = res.get()

Then I wanted to switch to Pyspark, however I am still very new to spark python, and got stuck after writing a few lines with it, actually I only progress I have made was reading in the data to RDD using sc.textFile...Have read the existing posts but they are all using Scala..It will be great if anyone can help or provide any guidance with Pyspark. Thanks a lot!

Ruby.L
  • 41
  • 1
  • 3

1 Answers1

3

You could try a solution similar to this stackoverflow answer, though since your data is already tokenized (a list of strings), you wouldn't need to do that step or the ngram step.

I'll also mention that the approxSimilarityJoin in pyspark calculates the Jaccard Distance rather than the Jaccard Similarity, but you can just subtract from 1 to convert back to the Similarity if you need that in particular.

Your code would end up looking similar to:

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, MinHashLSH
import pyspark.sql.functions as f

db = spark.createDataFrame([
        ('movie_1', ['romantic','comedy','English']),
        ('movie_2', ['action','kongfu','Chinese']),
        ('movie_3', ['romantic', 'action'])
    ], ['movie_id', 'genres'])


model = Pipeline(stages=[
        HashingTF(inputCol="genres", outputCol="vectors"),
        MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=10)
    ]).fit(db)

db_hashed = model.transform(db)

db_matches = model.stages[-1].approxSimilarityJoin(db_hashed, db_hashed, 0.9)

#show all matches (including duplicates)
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).show()

#show non-duplicate matches
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).filter('movie_id_A < movie_id_B').show()

With the corresponding output:

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
|   movie_3|   movie_3|    0.0|
|   movie_1|   movie_3|   0.75|
|   movie_2|   movie_3|   0.75|
|   movie_1|   movie_1|    0.0|
|   movie_2|   movie_2|    0.0|
|   movie_3|   movie_2|   0.75|
|   movie_3|   movie_1|   0.75|
+----------+----------+-------+

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
|   movie_1|   movie_3|   0.75|
|   movie_2|   movie_3|   0.75|
+----------+----------+-------+
Bob Swain
  • 3,052
  • 3
  • 17
  • 28
  • can you please explain what numHashTables indicate and why you chose 10? – Shirin Yavari May 16 '21 at 00:26
  • @ShirinYavari This link has an okay explanation of it: https://cran.r-project.org/web/packages/textreuse/vignettes/textreuse-minhash.html. In summary, I chose the 10 somewhat arbitrarily. The 10 means that the code will create 10 independent minhash values from each record, and then the approxSimilarityJoin will compare those 10 values from each row to see how many of them are the same in each row vs different to come up with a similarity score. If you choose less than 10, the calculated similarity between the rows will be less accurate, and if you do more than 10, it will be more accurate. – Bob Swain May 17 '21 at 19:11
  • Thanks so much for your explanations and the link. will definitely look into it. – Shirin Yavari May 18 '21 at 06:55
  • @BobSwain Hey can you look into my question, I'm able to run this small set like 100 records but when I do it for like 10K records I'm running into this error https://stackoverflow.com/questions/73968030/failed-to-execute-user-defined-functionlshmodellambda – Chris_007 Oct 06 '22 at 02:15