0

I'm a beginner in spark and I'm dealing with a large dataset (over 1.5 Million rows and 2 columns). I have to evaluate the Cosine Similarity of the field "features" beetween each row. The main problem is this iteration beetween the rows and finding an efficient and rapid method. I will have to use this method with another dataset of 42.5 Million rows and it would be a big computational problem if I won't find the most efficient way of doing it.


| post_id   |     features       |
| --------  |     --------       |
| Bkeur23   |[cat,dog,person]    |
| Ksur312kd |[wine,snow,police]  |
| BkGrtTeu3 |[]                  |
| Fwd2kd    |[person,snow,cat]   |


I've created an algorithm that evaluates this cosine similarity beetween each element of the i-th and j-th row but i've tried using lists or creating a spark DF / RDD for each result and merging them using the" union" function. The function I've used to evaluate the cosineSimilarity is the following. It takes 2 lists in input ( the lists of the i-th and j-th rows) and returns the maximum value of the cosine similarity between each couple of elements in the lists. But this is not the problem.

def cosineSim(lista1,lista2,embed): 
#embed = hub.KerasLayer(os.getcwd())
    eps=sys.float_info.epsilon
    if((lista1 is not None) and (lista2 is not None)):
        if((len(lista1)>0) and (len(lista2)>0)):
            risultati={}
            for a in lista1:
                tem = a
                x = tf.constant([tem])
                embeddings = embed(x)
                x = np.asarray(embeddings)
                x1 = x[0].tolist()
                for b in lista2:
                    tem = b
                    x = tf.constant([tem])
                    embeddings = embed(x)
                    x = np.asarray(embeddings)
                    x2 = x[0].tolist()

                    sum = 0
                    suma1 = 0
                    sumb1 = 0
                    for i,j in zip(x1, x2):
                        suma1 += i * i
                        sumb1 += j*j
                        sum += i*j
                    cosine_sim = sum / ((sqrt(suma1))*(sqrt(sumb1))+eps)
                    risultati[a+'-'+b]=cosine_sim
                    cosine_sim=0
            risultati=max(risultati.values())
    return risultati

The function I'm using to iterate over the rows is the following one:

def iterazione(df,numero,embed):
    a=1
    k=1
    
    emp_RDD = spark.sparkContext.emptyRDD()
    columns1= StructType([StructField('Source', StringType(), False),
                       StructField('Destination', StringType(), False),
                     StructField('CosinSim',FloatType(),False)])
    first_df = spark.createDataFrame(data=emp_RDD,
                                         schema=columns1)
    
    for i in df:
        
        for j in islice(df, a, None):
            r=cosineSim(i[1],j[1],embed)
            
            
            if(r>0.45):
                z=spark.createDataFrame(data=[(i[0],j[0],r)],schema=columns1)
                first_df=first_df.union(z)
                
            k=k+1
            if(k==numero):
                k=a+1
        a=a+1
    return first_df

The output I desire is something like this:




| Source    |  Dest     | CosinSim |
| --------  |  ----     |  ------  |
| Bkeur23   | Ksur312kd | 0.93     |
| Bkeur23   | Fwd2kd    | 0.673    |
| Ksur312kd | Fwd2kd    | 0.76     |


But there is a problem in my "iterazione" function. I ask you to help me finding the best way to iterate all over this rows. I was thinking also about copying the column "features" as "features2" and applying my function using WithColumn but I don't know how to do it and if it will work. I want to know if there's some method to do it directly in a spark dataframe, avoiding the creation of other datasets and merging them later, or if you know some method more rapid and efficient. Thank you!

  • Does this help you in any way? https://stackoverflow.com/questions/46758768/calculating-the-cosine-similarity-between-all-the-rows-of-a-dataframe-in-pyspark – ms12 Dec 17 '22 at 20:12

0 Answers0