I want to calculate the cosine similarity of 2 vectors using Pandas UDF. I implemented it with Spark UDF, which works fine with the following script.
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
# Create dataframe
df = spark.createDataFrame([("A", [1, 2, 3], [3, 4, 5]), ("B", [5, 6, 7], [7, 8, 9] )], ("name", "vec1", "vec2"))
# Cosime Similarity function
def cosine_similarity(vec1, vec2):
return float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)))
# Spark UDF
cosine_similarity_udf = udf(cosine_similarity, FloatType())
When I wrap it with Pandas UDF, as follows, it gives me a TypeError saying TypeError: only size-1 arrays can be converted to Python scalars
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(returnType=FloatType())
def cosine_similarity_udf(vec1: pd.Series, vec2: pd.Series) -> pd.Series:
return pd.Series(cosine_similarity(vec1, vec2))
What should be the correct way to get this desired output using Pandas UDF?
I can get values if I do:
@pandas_udf(returnType=FloatType())
def cosine_similarity_udf(vec1: pd.Series, vec2: pd.Series) -> pd.Series:
return vec1.combine(vec2, func=lambda v1, v2: cosine_similarity(np.array(v1), np.array(v2)))
then call it with:
df = df.withColumn("cosine_similarity", cosine_similarity_udf("vec1", "vec2"))
df.show()