6

Beginner Pyspark question here! I have a dataframe of ~2M rows of already vectorized text (via w2v; 300 dimensions). What is the most efficient way to calculate the cosine distance for each row against a new single vector input?

My current methodology uses a udf and takes a couple minutes, far too long for the webapp I'd like to create.

Create a sample df:

import numpy as np
import pandas as pd
from pyspark.sql.functions import *

column=[]
num_rows = 10000 #change to 2000000 to really slow your computer down!
for x in range(num_rows):
    sample = np.random.uniform(low=-1, high=1, size=(300,)).tolist()
    column.append(sample)
index = range(1000)
df_pd = pd.DataFrame([index, column]).T
#df_pd = pd.concat([df.T[x] for x in df.T], ignore_index=True)
df_pd.head()
df = spark.createDataFrame(df_pd).withColumnRenamed('0', 'Index').withColumnRenamed('1', 'Vectors')
df.show()

Create a sample input (which I create as a spark df in order to transform through my existing pipeline):

new_input = np.random.uniform(low=-1, high=1, size=(300,)).tolist()
df_pd_new = pd.DataFrame([[new_input]])
df_new = spark.createDataFrame(df_pd_new, ['Input_Vector'])
df_new.show()

Calculate cosine distance or similarity between Vectors and new_input:

value = df_new.select('Input_Vector').collect()[0][0]
def cos_sim(vec):
    if (np.linalg.norm(value) * np.linalg.norm(vec)) !=0:
        dot_value = np.dot(value, vec) / (np.linalg.norm(value)*np.linalg.norm(vec))
        return dot_value.tolist()
cos_sim_udf = udf(cos_sim, FloatType())

#df_all_cos = df_all.withColumn('cos_dis', dot_product_udf('w2v')).dropna(subset='cos_dis')
df_cos = df.withColumn('cos_dis', cos_sim_udf('Vectors')).dropna(subset='cos_dis')
df_cos.show()

And finally let's pull out the max 5 indices for fun:

max_values = df_cos.select('index','cos_dis').orderBy('cos_dis', ascending=False).limit(5).collect()
top_indicies = []
for x in max_values:
    top_indicies.append(x[0])
print top_indicies

No pyspark function for cosine distance exists (which would be ideal), so I'm not sure how to speed this up. Any ideas greatly appreciate!

whs2k
  • 741
  • 2
  • 10
  • 19
  • 2
    Related/potentially helpful: [Calculating consine similarity between all the rows of a dataframe in pyspark](https://stackoverflow.com/questions/46758768/calculating-the-cosine-similarity-between-all-the-rows-of-a-dataframe-in-pyspark) and [Efficient String Matching in Spark](https://stackoverflow.com/questions/43938672/efficient-string-matching-in-apache-spark). – pault Feb 27 '18 at 22:48
  • what if to through RDD way instead of DataFrame and use sparks cache and persist options to optimize the result. – dev Jan 18 '20 at 15:39
  • 1
    One thing you can do is normalize all the vectors ahead of time. Then you don't need to divide by the two norms in your comparator. – Union find Jun 29 '20 at 20:07

1 Answers1

1

You could try using pandas_udf instead of udf:

# other imports
from pyspark.sql.pandas.functions import pandas_udf

# make sure arrow is actually used
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")

def cos_sim2(vec: pd.Series) -> pd.Series:
    value_norm = np.linalg.norm(value)
    cs_value = vec.apply(lambda v: np.dot(value, v) / (np.linalg.norm(v) * value_norm))
    return cs_value.replace(np.inf, np.nan)

cos_sim_udf = pandas_udf(cos_sim2, FloatType())
bzu
  • 1,242
  • 1
  • 8
  • 14