I see that dataframe.agg(avg(Col)
works fine, but when i calculate avg() over a window over whole column(not using any partition), i see different results based on which column i use with orderBy.
Sample code:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sample_for_SE").getOrCreate()
# Sample data
data = [
(1, 10.0, 5.0),
(3, 20.0, None),
(5, 15.0, None)
]
schema = ["id", "value1", "value2"]
df = spark.createDataFrame(data, schema=schema)
# Display DataFrame and avg()
df.show()
df.agg(avg("value1")).show()
And the output showing DF and avg correctly:
+---+------+------+
| id|value1|value2|
+---+------+------+
| 1| 10.0| 5.0|
| 3| 20.0| NULL|
| 5| 15.0| NULL|
+---+------+------+
+-----------+
|avg(value1)|
+-----------+
| 15.0|
+-----------+
However with window function:
from pyspark.sql.window import Window
#with orderBy("value1")
#========================
w = Window.orderBy("value1")
df.withColumn("AVG",avg(col("value1")).over(w))\
.sort("id",ascending=True)\
.show()
#with orderBy("id")
#========================
w = Window.orderBy("id")
df.withColumn("AVG",avg(col("value1")).over(w))\
.sort("id",ascending=True)\
.show()
Output:
| id|value1|value2| AVG|
+---+------+------+----+
| 1| 10.0| 5.0|10.0|
| 3| 20.0| NULL|15.0|
| 5| 15.0| NULL|12.5|
+---+------+------+----+
+---+------+------+----+
| id|value1|value2| AVG|
+---+------+------+----+
| 1| 10.0| 5.0|10.0|
| 3| 20.0| NULL|15.0|
| 5| 15.0| NULL|15.0|
+---+------+------+----+
Question:
avg()
?It's something I obtained in my experiments too and understood after quite some time. I later found a reference link where this thing is explained but I cannot find it anymore.
Anyway, the reason this happens is the following: whenever we use a Window
function in Spark and decide to order it with .orderBy()
, there is an optional argument .rangeBetween
that is hiddenly set by default to (Window.unboundedPreceding, Window.currentRow)
, which means that the moving average for each row is taken by averaging all the values between that current row and the first row of the dataframe.
Example:
To overcome this "issue/feature", you just need to specify the WindowSpec you want in the definition of the window, i.e.:
w = Window.orderBy("value1").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
but in such case, I suggest you to use the other method df.agg(avg("value1"))
that was working, since there is no reason to take the average of the entire dataframe with a Window function.