algorithmpyspark

Making a series montonically decreasing in pyspark


Say we have a pyspark dataframe that looks like

ID Pct
1 [50,10,5,20,2]
2 [42,10, 15,5,3]

I want to add another column such that the pct column is monotonically decreasing

ID Pct Mon_dec_pct
1 [50,10,5,20,2] [50,10,5,5,2]
2 [42,10, 15,5,3] [42,10,10,5,3]

Would we be able to do this in pyspark?


Solution

  • Using transform,

    df = (df.withColumn('Mon_dec_pct', 
                        F.expr("""
                               transform(Pct, (x, i) -> 
                                         case
                                             when i = 0 then x
                                             when Pct[i-1] > x then x 
                                             else Pct[i-1]
                                         end)
                               """)))
    

    ========================================

    UPDATE

    @busfighter brought up a good point.

    If an array looks like [50,10,15,12,2], the simple transform doesn't work because it can only compare the value against 1 previous value.

    So, you need to carry the minimum value.

    This is with using the reduce to carry both the updated array and the minimum value.

    df = (df.withColumn('Mon_dec_pct', 
                        F.expr("""
                               reduce(Pct, 
                                      struct(cast(array() as array<int>) as result, Pct[0] as min_val),
                                      (acc, x) -> (array_append(acc.result, least(acc.min_val, x)), 
                                                   least(acc.min_val, x)),
                                      x -> x.result
                               )
                               """)))
    

    Input

    +---+-------------------+
    | ID|                Pct|
    +---+-------------------+
    |  1|[50, 10, 15, 12, 2]|
    |  2| [42, 10, 15, 5, 3]|
    +---+-------------------+
    

    Result

    +---+-------------------+-------------------+
    | ID|                Pct|       Mon_dec_pct |
    +---+-------------------+-------------------+
    |  1|[50, 10, 15, 12, 2]|[50, 10, 10, 10, 2]|
    |  2| [42, 10, 15, 5, 3]| [42, 10, 10, 5, 3]|
    +---+-------------------+-------------------+