I'm a beginner in spark and I'm dealing with a large dataset (over 1.5 Million rows and 2 columns). I have to evaluate the Cosine Similarity of the field "features" beetween each row. The main problem is this iteration beetween the rows and finding an efficient and rapid method. I will have to use this method with another dataset of 42.5 Million rows and it would be a big computational problem if I won't find the most efficient way of doing it.
| post_id | features |
| -------- | -------- |
| Bkeur23 |[cat,dog,person] |
| Ksur312kd |[wine,snow,police] |
| BkGrtTeu3 |[] |
| Fwd2kd |[person,snow,cat] |
I've created an algorithm that evaluates this cosine similarity beetween each element of the i-th and j-th row but i've tried using lists or creating a spark DF / RDD for each result and merging them using the" union" function. The function I've used to evaluate the cosineSimilarity is the following. It takes 2 lists in input ( the lists of the i-th and j-th rows) and returns the maximum value of the cosine similarity between each couple of elements in the lists. But this is not the problem.
def cosineSim(lista1,lista2,embed):
#embed = hub.KerasLayer(os.getcwd())
eps=sys.float_info.epsilon
if((lista1 is not None) and (lista2 is not None)):
if((len(lista1)>0) and (len(lista2)>0)):
risultati={}
for a in lista1:
tem = a
x = tf.constant([tem])
embeddings = embed(x)
x = np.asarray(embeddings)
x1 = x[0].tolist()
for b in lista2:
tem = b
x = tf.constant([tem])
embeddings = embed(x)
x = np.asarray(embeddings)
x2 = x[0].tolist()
sum = 0
suma1 = 0
sumb1 = 0
for i,j in zip(x1, x2):
suma1 += i * i
sumb1 += j*j
sum += i*j
cosine_sim = sum / ((sqrt(suma1))*(sqrt(sumb1))+eps)
risultati[a+'-'+b]=cosine_sim
cosine_sim=0
risultati=max(risultati.values())
return risultati
The function I'm using to iterate over the rows is the following one:
def iterazione(df,numero,embed):
a=1
k=1
emp_RDD = spark.sparkContext.emptyRDD()
columns1= StructType([StructField('Source', StringType(), False),
StructField('Destination', StringType(), False),
StructField('CosinSim',FloatType(),False)])
first_df = spark.createDataFrame(data=emp_RDD,
schema=columns1)
for i in df:
for j in islice(df, a, None):
r=cosineSim(i[1],j[1],embed)
if(r>0.45):
z=spark.createDataFrame(data=[(i[0],j[0],r)],schema=columns1)
first_df=first_df.union(z)
k=k+1
if(k==numero):
k=a+1
a=a+1
return first_df
The output I desire is something like this:
| Source | Dest | CosinSim |
| -------- | ---- | ------ |
| Bkeur23 | Ksur312kd | 0.93 |
| Bkeur23 | Fwd2kd | 0.673 |
| Ksur312kd | Fwd2kd | 0.76 |
But there is a problem in my "iterazione" function. I ask you to help me finding the best way to iterate all over this rows. I was thinking also about copying the column "features" as "features2" and applying my function using WithColumn but I don't know how to do it and if it will work. I want to know if there's some method to do it directly in a spark dataframe, avoiding the creation of other datasets and merging them later, or if you know some method more rapid and efficient. Thank you!