Say we have a pyspark dataframe that looks like
ID | Pct |
---|---|
1 | [50,10,5,20,2] |
2 | [42,10, 15,5,3] |
I want to add another column such that the pct column is monotonically decreasing
ID | Pct | Mon_dec_pct |
---|---|---|
1 | [50,10,5,20,2] | [50,10,5,5,2] |
2 | [42,10, 15,5,3] | [42,10,10,5,3] |
Would we be able to do this in pyspark?
Using transform
,
df = (df.withColumn('Mon_dec_pct',
F.expr("""
transform(Pct, (x, i) ->
case
when i = 0 then x
when Pct[i-1] > x then x
else Pct[i-1]
end)
""")))
========================================
UPDATE
@busfighter brought up a good point.
If an array looks like [50,10,15,12,2]
, the simple transform
doesn't work because it can only compare the value against 1 previous value.
So, you need to carry the minimum value.
This is with using the reduce
to carry both the updated array and the minimum value.
df = (df.withColumn('Mon_dec_pct',
F.expr("""
reduce(Pct,
struct(cast(array() as array<int>) as result, Pct[0] as min_val),
(acc, x) -> (array_append(acc.result, least(acc.min_val, x)),
least(acc.min_val, x)),
x -> x.result
)
""")))
Input
+---+-------------------+
| ID| Pct|
+---+-------------------+
| 1|[50, 10, 15, 12, 2]|
| 2| [42, 10, 15, 5, 3]|
+---+-------------------+
Result
+---+-------------------+-------------------+
| ID| Pct| Mon_dec_pct |
+---+-------------------+-------------------+
| 1|[50, 10, 15, 12, 2]|[50, 10, 10, 10, 2]|
| 2| [42, 10, 15, 5, 3]| [42, 10, 10, 5, 3]|
+---+-------------------+-------------------+