apache-sparkpysparkapache-spark-sql

How to drop records after date based on condition


I'm looking for an elegant way to drop all records in a DataFrame that occur before the latest occurrence of 'TEST_COMPONENT' being 'UNSATISFACTORY', based on their 'TEST_DT' value for each ID.

For example, given the following DataFrame for ID 5000:

| ID   | TEST_ID | TEST_COMPONENT | TEST_DT                       |
|------|---------|----------------|-------------------------------|
| 5000 | ENGL    | SATISFACTORY   | 2023-01-04T00:00:00.000+11:00 |
| 5000 | ENGL    | SATISFACTORY   | 2022-09-07T00:00:00.000+10:00 |
| 5000 | OTHER   | NONE           | 2022-09-07T00:00:00.000+10:00 |
| 5000 | ENGL    | UNSATISFACTORY | 2016-05-23T00:00:00.000+10:00 |
| 5000 | OTHER   | NONE           | 2016-05-23T00:00:00.000+10:00 |
| 5000 | OTHER   | NONE           | 2016-05-23T00:00:00.000+10:00 |
| 5000 | OTHER   | NONE           | 2016-02-09T00:00:00.000+11:00 |
| 5000 | OTHER   | NONE           | 2016-02-09T00:00:00.000+11:00 |
| 5000 | OTHER   | NONE           | 2016-02-09T00:00:00.000+11:00 |
| 5000 | ENGL    | UNSATISFACTORY | 2014-05-29T00:00:00.000+10:00 |
| 5000 | OTHER   | NONE           | 2013-09-27T00:00:00.000+10:00 |

I would like to retain only the rows from the latest 'UNSATISFACTORY' record onward. The desired output for this example would be:

| ID   | TEST_ID | TEST_COMPONENT | TEST_DT                       |
|------|---------|----------------|-------------------------------|
| 5000 | ENGL    | SATISFACTORY   | 2023-01-04T00:00:00.000+11:00 |
| 5000 | ENGL    | SATISFACTORY   | 2022-09-07T00:00:00.000+10:00 |
| 5000 | OTHER   | NONE           | 2022-09-07T00:00:00.000+10:00 |
| 5000 | ENGL    | UNSATISFACTORY | 2016-05-23T00:00:00.000+10:00 |

How can I achieve this in an efficient manner utlizing PySpark?


Solution

  • Here is my attempt using Window function:

    window_spec = Window.partitionBy("ID")
    
    df = df.withColumn("MAX_DATE",F.max(F.when(df['TEST_COMPONENT']=='UNSATISFACTORY',df['TEST_DT']).otherwise(None)).over(window_spec))
    df_drop = df.filter((df['TEST_DT']>F.col('MAX_DATE')) | ((df['TEST_DT']==F.col('MAX_DATE')) & (df['TEST_COMPONENT']=='UNSATISFACTORY')))
    
    df_drop.show(truncate=False)