Any idea how to write this in PySpark?
I have two PySpark DataFrames that i'm trying to union. However, there is 1 value that I want to update based on 2 duplicate column values.
PyDf1:
+-----------+-----------+-----------+------------+
|test_date |student_id |take_home |grade |
+-----------+-----------+-----------+------------+
| 2022-09-26| 655| N| A|
| 2022-09-26| 656| N| B|
| 2022-09-26| 657| N| C|
| 2022-09-26| 658| N| D|
+-----------+-----------+-----------+------------+
PyDf2:
+-----------+-----------+-----------+------------+
|test_date |student_id |take_home |grade |
+-----------+-----------+-----------+------------+
| 2022-09-27| 655| N| D|
| 2022-09-27| 656| N| C|
| 2022-09-27| 657| N| B|
| 2022-09-27| 658| N| A|
| 2022-09-26| 655| N| B| <- Duplicate test_date & student_id, different grade
+-----------+-----------+-----------+------------+
desired output:
+-----------+-----------+-----------+------------+
|test_date |student_id |take_home |grade |
+-----------+-----------+-----------+------------+
| 2022-09-26| 655| N| B| <- Updated to B for grade
| 2022-09-26| 656| N| B|
| 2022-09-26| 657| N| C|
| 2022-09-26| 658| N| D|
| 2022-09-27| 655| N| D|
| 2022-09-27| 656| N| C|
| 2022-09-27| 657| N| B|
| 2022-09-27| 658| N| A|
+-----------+-----------+-----------+------------+
Use window functions. Logic and code below
df = (PyDf1.unionByName(PyDf2)#Union the dfs
.withColumn('CurrentGrade', lead('grade').over(Window.partitionBy('student_id','test_date').orderBy('student_id',desc(to_date('test_date')))))#Create column comparing consecutive grades
.where(col('CurrentGrade').isNull())#retain last grade by dropping null
.drop('CurrentGrade')#Drop the temp column
)