pythonpandasapache-sparkpysparkpandas-udf

Use Pandas UDF to calculate Cosine Similarity of two vectors in PySpark


I want to calculate the cosine similarity of 2 vectors using Pandas UDF. I implemented it with Spark UDF, which works fine with the following script.

import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Create dataframe
df = spark.createDataFrame([("A", [1, 2, 3], [3, 4, 5]), ("B", [5, 6, 7], [7, 8, 9] )], ("name", "vec1", "vec2"))

# Cosime Similarity function
def cosine_similarity(vec1, vec2):
    return float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)))

# Spark UDF
cosine_similarity_udf = udf(cosine_similarity, FloatType())

When I wrap it with Pandas UDF, as follows, it gives me a TypeError saying TypeError: only size-1 arrays can be converted to Python scalars

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(returnType=FloatType())
def cosine_similarity_udf(vec1: pd.Series, vec2: pd.Series) -> pd.Series:
    return pd.Series(cosine_similarity(vec1, vec2))

What should be the correct way to get this desired output using Pandas UDF? enter image description here


Solution

  • I can get values if I do:

    @pandas_udf(returnType=FloatType())
    def cosine_similarity_udf(vec1: pd.Series, vec2: pd.Series) -> pd.Series:
        return vec1.combine(vec2, func=lambda v1, v2: cosine_similarity(np.array(v1), np.array(v2)))
    

    then call it with:

    df = df.withColumn("cosine_similarity", cosine_similarity_udf("vec1", "vec2"))
    df.show()
    

    Pandas doc for Series.combine.