apache-sparkpysparkapache-spark-sqltime-series

value of another column that is the same row as my last lag value


I have a timeseries dataset. I am looking to make a new column that represents the last reported (not null) value. I think I have this part figured out, using a combination of lag and last

I would also like to know the timestamp for that last reported (non null) value. I never expect timestamp_ms to be null, even though val can be null.

Sample Data

df = spark.createDataFrame([
    Row(timestamp_ms=1672531200000, val='19'),
    Row(timestamp_ms=1672532100000, val='20'),
    Row(timestamp_ms=1672533000000, val=None),
    Row(timestamp_ms=1672533900000, val='22'),
    Row(timestamp_ms=1672534800000, val=None),
    Row(timestamp_ms=1672535700000, val=None),
    Row(timestamp_ms=1672536600000, val='25'),
    Row(timestamp_ms=1672537500000, val='20'),
    Row(timestamp_ms=1672538400000, val='27')
])
df.show()

Sample Code

Returns the Last Lagged Value and attempts to return the timestamp when that value was reported.

df_lag = df.withColumn("lag_prev_val", F.lag("val")\
                                    .over(Window.partitionBy()\
                                    .orderBy("timestamp_ms"))
                         )\
            .withColumn("last_lag_prev_val", F.last("lag_prev_val", True)\
                                    .over(Window.partitionBy()\
                                    .orderBy("timestamp_ms"))
                         )\
            .withColumn("last_lag_prev_time", F.lag("timestamp_ms")\
                                .over(Window.partitionBy()\
                                .orderBy("timestamp_ms"))
                     )
df_lag.show()

Current Output

last_lag_prev_time represents the previously reported timestamp, rather than the timestamp associated with the last_lag_prev_val

timestamp_ms val lag_prev_val last_lag_prev_val last_lag_prev_time
1672531200000 19 null null null
1672532100000 20 19 19 1672531200000
1672533000000 null 20 20 1672532100000
1672533900000 22 null 20 1672533000000
1672534800000 null 22 22 1672533900000
1672535700000 null null 22 1672534800000
1672536600000 25 null 22 1672535700000
1672537500000 20 25 25 1672536600000
1672538400000 27 20 20 1672537500000

Ideal output

The output I want (bolded the differences) is for the last_lag_prev_time column to represent the timestamp_ms value that is from the same row as the original val that was used to populate `last_lag_prev_val'

timestamp_ms val lag_prev_val last_lag_prev_val last_lag_prev_time
1672531200000 19 null null null
1672532100000 20 19 19 1672531200000
1672533000000 null 20 20 1672532100000
1672533900000 22 null 20 1672532100000
1672534800000 null 22 22 1672533900000
1672535700000 null null 22 1672533900000
1672536600000 25 null 22 1672533900000
1672537500000 20 25 25 1672536600000
1672538400000 27 20 20 1672537500000

Solution

  • One solution would be to only consider timestamps in rows that don't have a val that is NULL, which we can do by creating a column called val_timestamp_ms. Then we can get the last timestamp from this new column and also apply a lag. For example:

    df.withColumn("lag_prev_val", F.lag("val")\
                                        .over(Window.partitionBy()\
                                        .orderBy("timestamp_ms"))
                             )\
                .withColumn("last_lag_prev_val", F.last("lag_prev_val", True)\
                                        .over(Window.partitionBy()\
                                        .orderBy("timestamp_ms"))
                             )\
                .withColumn("val_timestamp_ms", F.when(F.col("val").isNull(), None)\
                                    .otherwise(F.col("timestamp_ms"))
                             )\
                .withColumn("last_prev_time", F.last("val_timestamp_ms", True)\
                                        .over(Window.partitionBy()\
                                        .orderBy("timestamp_ms"))
                             )\
                .withColumn("last_lag_prev_time", F.lag("last_prev_time")\
                                        .over(Window.partitionBy()\
                                        .orderBy("timestamp_ms"))
                             )
    
    +-------------+----+------------+-----------------+----------------+--------------+------------------+
    | timestamp_ms| val|lag_prev_val|last_lag_prev_val|val_timestamp_ms|last_prev_time|last_lag_prev_time|
    +-------------+----+------------+-----------------+----------------+--------------+------------------+
    |1672531200000|  19|        NULL|             NULL|   1672531200000| 1672531200000|              NULL|
    |1672532100000|  20|          19|               19|   1672532100000| 1672532100000|     1672531200000|
    |1672533000000|NULL|          20|               20|            NULL| 1672532100000|     1672532100000|
    |1672533900000|  22|        NULL|               20|   1672533900000| 1672533900000|     1672532100000|
    |1672534800000|NULL|          22|               22|            NULL| 1672533900000|     1672533900000|
    |1672535700000|NULL|        NULL|               22|            NULL| 1672533900000|     1672533900000|
    |1672536600000|  25|        NULL|               22|   1672536600000| 1672536600000|     1672533900000|
    |1672537500000|  20|          25|               25|   1672537500000| 1672537500000|     1672536600000|
    |1672538400000|  27|          20|               20|   1672538400000| 1672538400000|     1672537500000|
    +-------------+----+------------+-----------------+----------------+--------------+------------------+