I'm looking for an elegant way to drop all records in a DataFrame that occur before the latest occurrence of 'TEST_COMPONENT' being 'UNSATISFACTORY', based on their 'TEST_DT' value for each ID.
For example, given the following DataFrame for ID 5000:
| ID | TEST_ID | TEST_COMPONENT | TEST_DT |
|------|---------|----------------|-------------------------------|
| 5000 | ENGL | SATISFACTORY | 2023-01-04T00:00:00.000+11:00 |
| 5000 | ENGL | SATISFACTORY | 2022-09-07T00:00:00.000+10:00 |
| 5000 | OTHER | NONE | 2022-09-07T00:00:00.000+10:00 |
| 5000 | ENGL | UNSATISFACTORY | 2016-05-23T00:00:00.000+10:00 |
| 5000 | OTHER | NONE | 2016-05-23T00:00:00.000+10:00 |
| 5000 | OTHER | NONE | 2016-05-23T00:00:00.000+10:00 |
| 5000 | OTHER | NONE | 2016-02-09T00:00:00.000+11:00 |
| 5000 | OTHER | NONE | 2016-02-09T00:00:00.000+11:00 |
| 5000 | OTHER | NONE | 2016-02-09T00:00:00.000+11:00 |
| 5000 | ENGL | UNSATISFACTORY | 2014-05-29T00:00:00.000+10:00 |
| 5000 | OTHER | NONE | 2013-09-27T00:00:00.000+10:00 |
I would like to retain only the rows from the latest 'UNSATISFACTORY' record onward. The desired output for this example would be:
| ID | TEST_ID | TEST_COMPONENT | TEST_DT |
|------|---------|----------------|-------------------------------|
| 5000 | ENGL | SATISFACTORY | 2023-01-04T00:00:00.000+11:00 |
| 5000 | ENGL | SATISFACTORY | 2022-09-07T00:00:00.000+10:00 |
| 5000 | OTHER | NONE | 2022-09-07T00:00:00.000+10:00 |
| 5000 | ENGL | UNSATISFACTORY | 2016-05-23T00:00:00.000+10:00 |
How can I achieve this in an efficient manner utlizing PySpark?
Here is my attempt using Window function:
window_spec = Window.partitionBy("ID")
df = df.withColumn("MAX_DATE",F.max(F.when(df['TEST_COMPONENT']=='UNSATISFACTORY',df['TEST_DT']).otherwise(None)).over(window_spec))
df_drop = df.filter((df['TEST_DT']>F.col('MAX_DATE')) | ((df['TEST_DT']==F.col('MAX_DATE')) & (df['TEST_COMPONENT']=='UNSATISFACTORY')))
df_drop.show(truncate=False)