I have a Spark dataframe in the following form:
> df1
+---------------+----------------+
| vector1| vector2|
+---------------+----------------+
|[[0.9,0.5,0.2]]| [[0.1,0.3,0.2]]|
|[[0.8,0.7,0.1]]| [[0.8,0.4,0.2]]|
|[[0.9,0.2,0.8]]| [[0.3,0.1,0.8]]|
+---------------+----------------+
> df1.printSchema()
root
|-- vector1: array (nullable = true)
| |-- element: vector (containsNull = true)
|-- vector2: array (nullable = true)
| |-- element: vector (containsNull = true)
I need to calculate Euclidean distance or cosine similarity between vector1
and vector2
columns.
How can I do this using PySpark?
ā When columns are of array type:
distance = F.aggregate(
F.transform(
F.arrays_zip('vector1', 'vector2'),
lambda x: (x['vector1'] - x['vector2'])**2
),
F.lit(0.0),
lambda acc, x: acc + x,
lambda x: x**.5
)
Full test:
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[([0.9, 0.5, 0.2], [0.1, 0.3, 0.2]),
([0.8, 0.7, 0.1], [0.8, 0.4, 0.2]),
([0.9, 0.2, 0.8], [0.3, 0.1, 0.8])],
['vector1', 'vector2']
)
distance = F.aggregate(
F.transform(
F.arrays_zip('vector1', 'vector2'),
lambda x: (x['vector1'] - x['vector2'])**2
),
F.lit(0.0),
lambda acc, x: acc + x,
lambda x: x**.5
)
df2 = df1.withColumn('euclidean_distance', distance)
df2.show(truncate=0)
# +---------------+---------------+-------------------+
# |vector1 |vector2 |euclidean_distance |
# +---------------+---------------+-------------------+
# |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
# |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
# |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822 |
# +---------------+---------------+-------------------+
ā If columns are of vector type, I would first convert them to arrays:
df2 = df1.select(
vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
)
Full test:
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import vector_to_array
df1 = spark.createDataFrame(
[([Vectors.dense(0.9, 0.5, 0.2)], [Vectors.dense(0.1, 0.3, 0.2)]),
([Vectors.dense(0.8, 0.7, 0.1)], [Vectors.dense(0.8, 0.4, 0.2)]),
([Vectors.dense(0.9, 0.2, 0.8)], [Vectors.dense(0.3, 0.1, 0.8)])],
['vector1', 'vector2']
)
df2 = df1.select(
vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
)
distance = F.aggregate(
F.transform(
F.arrays_zip('vector1', 'vector2'),
lambda x: (x['vector1'] - x['vector2'])**2
),
F.lit(0.0),
lambda acc, x: acc + x,
lambda x: x**.5
)
df3 = df2.withColumn('euclidean_distance', distance)
df3.show(truncate=0)
# +---------------+---------------+-------------------+
# |vector1 |vector2 |euclidean_distance |
# +---------------+---------------+-------------------+
# |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
# |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
# |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822 |
# +---------------+---------------+-------------------+