So I have read this comprehensive material yet I don't understand why Window function acts this way.
Here's a little example:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
columns = ["CATEGORY", "REVENUE"]
data = [("Cell Phone", "6000"),
("Tablet", "1500"),
("Tablet", "5500"),
("Cell Phone", "5000"),
("Cell Phone", "6000"),
("Tablet", "2500"),
("Cell Phone", "3000"),
("Cell Phone", "3000"),
("Tablet", "3000"),
("Tablet", "4500"),
("Tablet", "6500")]
df = spark.createDataFrame(data=data, schema=columns)
window_spec = Window.partitionBy(df['CATEGORY']).orderBy(df['REVENUE'])
revenue_difference = F.max(df['REVENUE']).over(window_spec)
df.select(
df['CATEGORY'],
df['REVENUE'],
revenue_difference.alias("revenue_difference")).show()
So when I write orderBy(df['REVENUE'])
, I get this:
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
|Cell Phone| 3000| 3000|
|Cell Phone| 3000| 3000|
|Cell Phone| 5000| 5000|
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
| Tablet| 1500| 1500|
| Tablet| 2500| 2500|
| Tablet| 3000| 3000|
| Tablet| 4500| 4500|
| Tablet| 5500| 5500|
| Tablet| 6500| 6500|
+----------+-------+------------------+
But when I write orderBy(df['REVENUE']).desc()
, I get this:
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
|Cell Phone| 5000| 6000|
|Cell Phone| 3000| 6000|
|Cell Phone| 3000| 6000|
| Tablet| 6500| 6500|
| Tablet| 5500| 6500|
| Tablet| 4500| 6500|
| Tablet| 3000| 6500|
| Tablet| 2500| 6500|
| Tablet| 1500| 6500|
+----------+-------+------------------+
I don't understand because the way I see it, the MAX value in each window stays the same no matter what order is. So can someone please explain me what I am not gettin here??
Thank you!
The simple reason is that the default window range/row spec is Window.UnboundedPreceding
to Window.CurrentRow
, which means that the max is taken from the first row in that partition to the current row, NOT the last row of the partition.
This is a common gotcha. (you can replace .max()
with sum()
and see what output you get. It also changes depending on how you order the partition.)
To solve this, you can specify that you want the max of each partition to always be calculated using the full window partition, like so:
window_spec = Window.partitionBy(df['CATEGORY']).orderBy(df['REVENUE']).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
revenue_difference = F.max(df['REVENUE']).over(window_spec)
df.select(
df['CATEGORY'],
df['REVENUE'],
revenue_difference.alias("revenue_difference")).show()
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
| Tablet| 6500| 6500|
| Tablet| 5500| 6500|
| Tablet| 4500| 6500|
| Tablet| 3000| 6500|
| Tablet| 2500| 6500|
| Tablet| 1500| 6500|
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
|Cell Phone| 5000| 6000|
|Cell Phone| 3000| 6000|
|Cell Phone| 3000| 6000|
+----------+-------+------------------+