2

I want to calculate the cosine similarity of 2 vectors using Pandas UDF. I implemented it with Spark UDF, which works fine with the following script.

import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Create dataframe
df = spark.createDataFrame([("A", [1, 2, 3], [3, 4, 5]), ("B", [5, 6, 7], [7, 8, 9] )], ("name", "vec1", "vec2"))

# Cosime Similarity function
def cosine_similarity(vec1, vec2):
    return float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)))

# Spark UDF
cosine_similarity_udf = udf(cosine_similarity, FloatType())

When I wrap it with Pandas UDF, as follows, it gives me a TypeError saying TypeError: only size-1 arrays can be converted to Python scalars

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(returnType=FloatType())
def cosine_similarity_udf(vec1: pd.Series, vec2: pd.Series) -> pd.Series:
    return pd.Series(cosine_similarity(vec1, vec2))

What should be the correct way to get this desired output using Pandas UDF? enter image description here

Abdennacer Lachiheb
  • 4,388
  • 7
  • 30
  • 61
  • Just for those that never used `pyspark` and are trying to reproduce: you need to first create a session with `spark = SparkSession.builder.appName(...).getOrCreate()`. – Memristor May 03 '23 at 11:12

1 Answers1

1

I can get values if I do:

@pandas_udf(returnType=FloatType())
def cosine_similarity_udf(vec1: pd.Series, vec2: pd.Series) -> pd.Series:
    return vec1.combine(vec2, func=lambda v1, v2: cosine_similarity(np.array(v1), np.array(v2)))

then call it with:

df = df.withColumn("cosine_similarity", cosine_similarity_udf("vec1", "vec2"))
df.show()

Pandas doc for Series.combine.

Memristor
  • 599
  • 4
  • 11