vectorpysparkapache-spark-mleuclidean-distancearray-algorithms

Euclidean distance or cosine similarity between columns with vectors in PySpark


I have a Spark dataframe in the following form:

> df1
+---------------+----------------+
|        vector1|         vector2|  
+---------------+----------------+
|[[0.9,0.5,0.2]]| [[0.1,0.3,0.2]]|
|[[0.8,0.7,0.1]]| [[0.8,0.4,0.2]]|
|[[0.9,0.2,0.8]]| [[0.3,0.1,0.8]]|
+---------------+----------------+

> df1.printSchema()
root
 |-- vector1: array (nullable = true)
 |    |-- element: vector (containsNull = true)
 |-- vector2: array (nullable = true)
 |    |-- element: vector (containsNull = true)

I need to calculate Euclidean distance or cosine similarity between vector1 and vector2 columns.
How can I do this using PySpark?


Solution

  • ā— When columns are of array type:

    distance = F.aggregate(
        F.transform(
            F.arrays_zip('vector1', 'vector2'),
            lambda x: (x['vector1'] - x['vector2'])**2
        ),
        F.lit(0.0),
        lambda acc, x: acc + x,
        lambda x: x**.5
    )
    

    Full test:

    from pyspark.sql import functions as F
    df1 = spark.createDataFrame(
        [([0.9, 0.5, 0.2], [0.1, 0.3, 0.2]),
         ([0.8, 0.7, 0.1], [0.8, 0.4, 0.2]),
         ([0.9, 0.2, 0.8], [0.3, 0.1, 0.8])],
        ['vector1', 'vector2']
    )
    distance = F.aggregate(
        F.transform(
            F.arrays_zip('vector1', 'vector2'),
            lambda x: (x['vector1'] - x['vector2'])**2
        ),
        F.lit(0.0),
        lambda acc, x: acc + x,
        lambda x: x**.5
    )
    df2 = df1.withColumn('euclidean_distance', distance)
    
    df2.show(truncate=0)
    # +---------------+---------------+-------------------+
    # |vector1        |vector2        |euclidean_distance |
    # +---------------+---------------+-------------------+
    # |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
    # |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
    # |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822  |
    # +---------------+---------------+-------------------+
    

    ā— If columns are of vector type, I would first convert them to arrays:

    df2 = df1.select(
        vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
        vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
    )
    

    Full test:

    from pyspark.sql import functions as F
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.functions import vector_to_array
    df1 = spark.createDataFrame(
        [([Vectors.dense(0.9, 0.5, 0.2)], [Vectors.dense(0.1, 0.3, 0.2)]),
         ([Vectors.dense(0.8, 0.7, 0.1)], [Vectors.dense(0.8, 0.4, 0.2)]),
         ([Vectors.dense(0.9, 0.2, 0.8)], [Vectors.dense(0.3, 0.1, 0.8)])],
        ['vector1', 'vector2']
    )
    df2 = df1.select(
        vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
        vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
    )
    distance = F.aggregate(
        F.transform(
            F.arrays_zip('vector1', 'vector2'),
            lambda x: (x['vector1'] - x['vector2'])**2
        ),
        F.lit(0.0),
        lambda acc, x: acc + x,
        lambda x: x**.5
    )
    df3 = df2.withColumn('euclidean_distance', distance)
    
    df3.show(truncate=0)
    # +---------------+---------------+-------------------+
    # |vector1        |vector2        |euclidean_distance |
    # +---------------+---------------+-------------------+
    # |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
    # |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
    # |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822  |
    # +---------------+---------------+-------------------+