Beginner Pyspark question here! I have a dataframe of ~2M rows of already vectorized text (via w2v; 300 dimensions). What is the most efficient way to calculate the cosine distance for each row against a new single vector input?
My current methodology uses a udf and takes a couple minutes, far too long for the webapp I'd like to create.
Create a sample df:
import numpy as np
import pandas as pd
from pyspark.sql.functions import *
column=[]
num_rows = 10000 #change to 2000000 to really slow your computer down!
for x in range(num_rows):
sample = np.random.uniform(low=-1, high=1, size=(300,)).tolist()
column.append(sample)
index = range(1000)
df_pd = pd.DataFrame([index, column]).T
#df_pd = pd.concat([df.T[x] for x in df.T], ignore_index=True)
df_pd.head()
df = spark.createDataFrame(df_pd).withColumnRenamed('0', 'Index').withColumnRenamed('1', 'Vectors')
df.show()
Create a sample input (which I create as a spark df in order to transform through my existing pipeline):
new_input = np.random.uniform(low=-1, high=1, size=(300,)).tolist()
df_pd_new = pd.DataFrame([[new_input]])
df_new = spark.createDataFrame(df_pd_new, ['Input_Vector'])
df_new.show()
Calculate cosine distance or similarity between Vectors and new_input:
value = df_new.select('Input_Vector').collect()[0][0]
def cos_sim(vec):
if (np.linalg.norm(value) * np.linalg.norm(vec)) !=0:
dot_value = np.dot(value, vec) / (np.linalg.norm(value)*np.linalg.norm(vec))
return dot_value.tolist()
cos_sim_udf = udf(cos_sim, FloatType())
#df_all_cos = df_all.withColumn('cos_dis', dot_product_udf('w2v')).dropna(subset='cos_dis')
df_cos = df.withColumn('cos_dis', cos_sim_udf('Vectors')).dropna(subset='cos_dis')
df_cos.show()
And finally let's pull out the max 5 indices for fun:
max_values = df_cos.select('index','cos_dis').orderBy('cos_dis', ascending=False).limit(5).collect()
top_indicies = []
for x in max_values:
top_indicies.append(x[0])
print top_indicies
No pyspark function for cosine distance exists (which would be ideal), so I'm not sure how to speed this up. Any ideas greatly appreciate!