I have a timeseries dataset. I am looking to make a new column that represents the last reported (not null) value. I think I have this part figured out, using a combination of lag
and last
I would also like to know the timestamp for that last reported (non null) value. I never expect timestamp_ms
to be null, even though val
can be null.
Sample Data
df = spark.createDataFrame([
Row(timestamp_ms=1672531200000, val='19'),
Row(timestamp_ms=1672532100000, val='20'),
Row(timestamp_ms=1672533000000, val=None),
Row(timestamp_ms=1672533900000, val='22'),
Row(timestamp_ms=1672534800000, val=None),
Row(timestamp_ms=1672535700000, val=None),
Row(timestamp_ms=1672536600000, val='25'),
Row(timestamp_ms=1672537500000, val='20'),
Row(timestamp_ms=1672538400000, val='27')
])
df.show()
Sample Code
Returns the Last Lagged Value and attempts to return the timestamp when that value was reported.
df_lag = df.withColumn("lag_prev_val", F.lag("val")\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)\
.withColumn("last_lag_prev_val", F.last("lag_prev_val", True)\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)\
.withColumn("last_lag_prev_time", F.lag("timestamp_ms")\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)
df_lag.show()
Current Output
last_lag_prev_time
represents the previously reported timestamp, rather than the timestamp associated with the last_lag_prev_val
timestamp_ms | val | lag_prev_val | last_lag_prev_val | last_lag_prev_time |
---|---|---|---|---|
1672531200000 | 19 | null | null | null |
1672532100000 | 20 | 19 | 19 | 1672531200000 |
1672533000000 | null | 20 | 20 | 1672532100000 |
1672533900000 | 22 | null | 20 | 1672533000000 |
1672534800000 | null | 22 | 22 | 1672533900000 |
1672535700000 | null | null | 22 | 1672534800000 |
1672536600000 | 25 | null | 22 | 1672535700000 |
1672537500000 | 20 | 25 | 25 | 1672536600000 |
1672538400000 | 27 | 20 | 20 | 1672537500000 |
Ideal output
The output I want (bolded the differences) is for the last_lag_prev_time
column to represent the timestamp_ms
value that is from the same row as the original val
that was used to populate `last_lag_prev_val'
timestamp_ms | val | lag_prev_val | last_lag_prev_val | last_lag_prev_time |
---|---|---|---|---|
1672531200000 | 19 | null | null | null |
1672532100000 | 20 | 19 | 19 | 1672531200000 |
1672533000000 | null | 20 | 20 | 1672532100000 |
1672533900000 | 22 | null | 20 | 1672532100000 |
1672534800000 | null | 22 | 22 | 1672533900000 |
1672535700000 | null | null | 22 | 1672533900000 |
1672536600000 | 25 | null | 22 | 1672533900000 |
1672537500000 | 20 | 25 | 25 | 1672536600000 |
1672538400000 | 27 | 20 | 20 | 1672537500000 |
One solution would be to only consider timestamps in rows that don't have a val
that is NULL
, which we can do by creating a column called val_timestamp_ms
. Then we can get the last timestamp from this new column and also apply a lag. For example:
df.withColumn("lag_prev_val", F.lag("val")\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)\
.withColumn("last_lag_prev_val", F.last("lag_prev_val", True)\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)\
.withColumn("val_timestamp_ms", F.when(F.col("val").isNull(), None)\
.otherwise(F.col("timestamp_ms"))
)\
.withColumn("last_prev_time", F.last("val_timestamp_ms", True)\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)\
.withColumn("last_lag_prev_time", F.lag("last_prev_time")\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)
+-------------+----+------------+-----------------+----------------+--------------+------------------+
| timestamp_ms| val|lag_prev_val|last_lag_prev_val|val_timestamp_ms|last_prev_time|last_lag_prev_time|
+-------------+----+------------+-----------------+----------------+--------------+------------------+
|1672531200000| 19| NULL| NULL| 1672531200000| 1672531200000| NULL|
|1672532100000| 20| 19| 19| 1672532100000| 1672532100000| 1672531200000|
|1672533000000|NULL| 20| 20| NULL| 1672532100000| 1672532100000|
|1672533900000| 22| NULL| 20| 1672533900000| 1672533900000| 1672532100000|
|1672534800000|NULL| 22| 22| NULL| 1672533900000| 1672533900000|
|1672535700000|NULL| NULL| 22| NULL| 1672533900000| 1672533900000|
|1672536600000| 25| NULL| 22| 1672536600000| 1672536600000| 1672533900000|
|1672537500000| 20| 25| 25| 1672537500000| 1672537500000| 1672536600000|
|1672538400000| 27| 20| 20| 1672538400000| 1672538400000| 1672537500000|
+-------------+----+------------+-----------------+----------------+--------------+------------------+