pysparkapache-spark-sqlpyspark-pandaspyspark-schema

Update a specific value when 2 other values matches from 2 different tables in PySpark


Any idea how to write this in PySpark?

I have two PySpark DataFrames that i'm trying to union. However, there is 1 value that I want to update based on 2 duplicate column values.

PyDf1:

+-----------+-----------+-----------+------------+
|test_date  |student_id |take_home  |grade       |
+-----------+-----------+-----------+------------+
| 2022-09-26|        655|          N|           A|
| 2022-09-26|        656|          N|           B|
| 2022-09-26|        657|          N|           C|
| 2022-09-26|        658|          N|           D|
+-----------+-----------+-----------+------------+

PyDf2:

+-----------+-----------+-----------+------------+
|test_date  |student_id |take_home  |grade       |
+-----------+-----------+-----------+------------+
| 2022-09-27|        655|          N|           D|
| 2022-09-27|        656|          N|           C|
| 2022-09-27|        657|          N|           B|
| 2022-09-27|        658|          N|           A|
| 2022-09-26|        655|          N|           B|  <- Duplicate test_date & student_id, different grade
+-----------+-----------+-----------+------------+

desired output:

+-----------+-----------+-----------+------------+
|test_date  |student_id |take_home  |grade       |
+-----------+-----------+-----------+------------+
| 2022-09-26|        655|          N|           B|  <- Updated to B for grade
| 2022-09-26|        656|          N|           B|
| 2022-09-26|        657|          N|           C|
| 2022-09-26|        658|          N|           D|
| 2022-09-27|        655|          N|           D|
| 2022-09-27|        656|          N|           C|
| 2022-09-27|        657|          N|           B|
| 2022-09-27|        658|          N|           A|
+-----------+-----------+-----------+------------+

Solution

  • Use window functions. Logic and code below

      df = (PyDf1.unionByName(PyDf2)#Union the dfs
              .withColumn('CurrentGrade', lead('grade').over(Window.partitionBy('student_id','test_date').orderBy('student_id',desc(to_date('test_date')))))#Create column comparing consecutive grades
              .where(col('CurrentGrade').isNull())#retain last grade by dropping null
              .drop('CurrentGrade')#Drop the temp column
         )