apache-sparkpysparkapache-spark-sql

PySpark Window functions: Aggregation differs if WindowSpec has sorting


I am working through this example of aggregation functions for PySpark Window.

Here is the dataframe:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+

One of the WindowSpec's in the tutorial partitions the rows by "department" and sorts by "salary" within each department:

from pyspark.sql.window import Window
windowSpec  = Window.partitionBy("department").orderBy("salary")

To familiarize myself with Window operations, I tried to add a column "MaxRowNum" containing the maximum row number within each partition. To facilitate this, an intermediate column "RowNum" contains the row number within each partition:

from pyspark.sql.functions import row_number
df \
.withColumn('RowNum',row_number().over(windowSpec)) \
.withColumn('MaxRowNum',max(col('RowNum')).over(windowSpec)) \
.show()
+-------------+----------+------+------+---------+
|employee_name|department|salary|RowNum|MaxRowNum|
+-------------+----------+------+------+---------+
|        Maria|   Finance|  3000|     1|        1|
|        Scott|   Finance|  3300|     2|        2|
|          Jen|   Finance|  3900|     3|        3|
|        Kumar| Marketing|  2000|     1|        1|
|         Jeff| Marketing|  3000|     2|        2|
|        James|     Sales|  3000|     1|        2|
|        James|     Sales|  3000|     2|        2|
|       Robert|     Sales|  4100|     3|        4|
|         Saif|     Sales|  4100|     4|        4|
|      Michael|     Sales|  4600|     5|        5|
+-------------+----------+------+------+---------+

As shown above, the "RowNum" values are correct, but "MaxRowNum" don't contain the maximum row number within each partition. They merely contain the row number, except for tied rows, where they contain the larger of the two row numbers.

From later on in the tutorial, I found a WindowSpec that didn't have sorting, and it gave me the right result (see column "MaxRowCORRECT"):

windowSpecAgg  = Window.partitionBy("department") # No sorting

df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn('MaxRowNum',max(col('row')).over(windowSpec)) \
  .withColumn("MaxRowCORRECT",max(col("row")).over(windowSpecAgg)) \
  .show()
+-------------+----------+------+---+---------+-------------+
|employee_name|department|salary|row|MaxRowNum|MaxRowCORRECT|
+-------------+----------+------+---+---------+-------------+
|        Maria|   Finance|  3000|  1|        1|            3|
|        Scott|   Finance|  3300|  2|        2|            3|
|          Jen|   Finance|  3900|  3|        3|            3|
|        Kumar| Marketing|  2000|  1|        1|            2|
|         Jeff| Marketing|  3000|  2|        2|            2|
|        James|     Sales|  3000|  1|        2|            5|
|        James|     Sales|  3000|  2|        2|            5|
|       Robert|     Sales|  4100|  3|        4|            5|
|         Saif|     Sales|  4100|  4|        4|            5|
|      Michael|     Sales|  4600|  5|        5|            5|
+-------------+----------+------+---+---------+-------------+

My understanding is that Window aggregation functions operate over the entirety of each partition. The above code shows that this isn't necessarily the case. I've scanned the Windows documentation, but cannot find the unambiguous description of this conditional behaviour.

Is there actually a consistent and fully documented scheme for the Windows functions' operations? Where have I missed it in the documentation?

Background

Following mazaneicha's answer, I realized that I needed background on Window function taxomony. The PySpark links to the relevant terms yield empty pages (currentRow, unboundedPreceding, unboundedFollowing). These things seem to come from SQL. While I didn't find anything on rowFrame and rangeFrame, the following pages provide background on the other terms above (as does the documentation for rowsBetween):


Solution

  • This is the effect of different defaults used as window frame when aggregating over orderBy vs unordered window. Per the Spark online doc:

    When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined, a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.

    So, to make it work according to your expectations, you would need to set the bounds explicitly:

    windowSpec = Window.partitionBy("department").orderBy("salary")
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)