pythonpython-polars

Polars Time Series Path Dependent Event Outcome calculation


In the demo DataFrame I have three events:

df = pl.DataFrame(
    {
        "timestamp": [1, 2, 3, 4, 5, 6, 7, 8],
        "threshold": [8, None, None, None, 5, None, None, 8],
        "value": [2, 3, 4, 5, 6, 7, 8, 9],
        "event": [1, 0, 0, 0, 1, 0, 0, 1],
        "start_ts": [1, None, None, None, 5, None, None, 8],
        "end_ts": [6, None, None, None, 8, None, None, 8],
        "event_id": [0, None, None, None, 1, None, None, 2],
    }
).with_columns(pl.col("end_ts").sub(pl.col("start_ts")).alias("event_span"))

print(df)

out:
shape: (8, 8)
┌───────────┬───────────┬───────┬───────┬──────────┬────────┬──────────┬────────────┐
│ timestamp ┆ threshold ┆ value ┆ event ┆ start_ts ┆ end_ts ┆ event_id ┆ event_span │
│ ---       ┆ ---       ┆ ---   ┆ ---   ┆ ---      ┆ ---    ┆ ---      ┆ ---        │
│ i64       ┆ i64       ┆ i64   ┆ i64   ┆ i64      ┆ i64    ┆ i64      ┆ i64        │
╞═══════════╪═══════════╪═══════╪═══════╪══════════╪════════╪══════════╪════════════╡
│ 1         ┆ 8         ┆ 2     ┆ 1     ┆ 1        ┆ 6      ┆ 0        ┆ 5          │
│ 2         ┆ null      ┆ 3     ┆ 0     ┆ null     ┆ null   ┆ null     ┆ null       │
│ 3         ┆ null      ┆ 4     ┆ 0     ┆ null     ┆ null   ┆ null     ┆ null       │
│ 4         ┆ null      ┆ 5     ┆ 0     ┆ null     ┆ null   ┆ null     ┆ null       │
│ 5         ┆ 5         ┆ 6     ┆ 1     ┆ 5        ┆ 8      ┆ 1        ┆ 3          │
│ 6         ┆ null      ┆ 7     ┆ 0     ┆ null     ┆ null   ┆ null     ┆ null       │
│ 7         ┆ null      ┆ 8     ┆ 0     ┆ null     ┆ null   ┆ null     ┆ null       │
│ 8         ┆ 8         ┆ 9     ┆ 1     ┆ 8        ┆ 8      ┆ 2        ┆ 0          │
└───────────┴───────────┴───────┴───────┴──────────┴────────┴──────────┴────────────┘

Problem: I want to identify:

  1. event outcome: binary value indicating whether the threshold of each event is reached by value during each event span.
  2. event outcome timestamp: the timestamp where the first time of value reaching threshold

Additional Note:

Desired Output:

shape: (8, 10)
┌───────────┬───────────┬───────┬───────┬───┬──────────┬────────────┬───────────────┬──────────────┐
│ timestamp ┆ threshold ┆ value ┆ event ┆ … ┆ event_id ┆ event_span ┆ event_outcome ┆ event_outcom │
│ ---       ┆ ---       ┆ ---   ┆ ---   ┆   ┆ ---      ┆ ---        ┆ ---           ┆ e_timestamp  │
│ i64       ┆ i64       ┆ i64   ┆ i64   ┆   ┆ i64      ┆ i64        ┆ i32           ┆ ---          │
│           ┆           ┆       ┆       ┆   ┆          ┆            ┆               ┆ i64          │
╞═══════════╪═══════════╪═══════╪═══════╪═══╪══════════╪════════════╪═══════════════╪══════════════╡
│ 1         ┆ 8         ┆ 2     ┆ 1     ┆ … ┆ 0        ┆ 5          ┆ 0             ┆ null         │
│ 2         ┆ null      ┆ 3     ┆ 0     ┆ … ┆ null     ┆ null       ┆ null          ┆ null         │
│ 3         ┆ null      ┆ 4     ┆ 0     ┆ … ┆ null     ┆ null       ┆ null          ┆ null         │
│ 4         ┆ null      ┆ 5     ┆ 0     ┆ … ┆ null     ┆ null       ┆ null          ┆ null         │
│ 5         ┆ 5         ┆ 6     ┆ 1     ┆ … ┆ 1        ┆ 3          ┆ 1             ┆ 6            │
│ 6         ┆ null      ┆ 7     ┆ 0     ┆ … ┆ null     ┆ null       ┆ null          ┆ null         │
│ 7         ┆ null      ┆ 8     ┆ 0     ┆ … ┆ null     ┆ null       ┆ null          ┆ null         │
│ 8         ┆ 8         ┆ 9     ┆ 1     ┆ … ┆ 2        ┆ 0          ┆ null          ┆ null         │
└───────────┴───────────┴───────┴───────┴───┴──────────┴────────────┴───────────────┴──────────────┘

My current solution involves generating the full path values of each event, which is highly resource intensive, especially when the events overlap with each other:

event_df = (
    df
    .filter(pl.col("event") == 1, pl.col("event_span") > 0)
    .with_columns(
        pl.int_ranges(pl.col("start_ts")+1, pl.col("end_ts")+1) # Map event full path
        .alias("event_timestamps")
        )
    .explode("event_timestamps") # Generate event full path
    .drop(pl.col("value"))
    .join(
        df
        .select(pl.col("timestamp"), pl.col("value"))
        .rename({"timestamp": "event_timestamps"}), 
        on="event_timestamps", how="left") # Get the value of the full path
    .with_columns(# Map event outcome based on threshold
        pl.when(pl.col("value") >= pl.col("threshold"))
        .then(1)
        .otherwise(None)
        .alias("event_outcome")
        )
    .with_columns(# Get event outcome timestamp
        pl.when(pl.col("event_outcome") == 1)
        .then(pl.col("event_timestamps"))
        .otherwise(None)
        .alias("event_outcome_timestamp")
        )
    .with_columns(# Map event outcome to the event start timestamp
        pl.col("event_outcome", "event_outcome_timestamp")
        .fill_null(strategy="backward")
        .over("event_id")
        )
    .unique("event_id")
    .sort("event_id")
    .with_columns(# Take care of the events that have not exceeded the threshold
        pl.when(pl.col("event_outcome").is_null())
        .then(0)
        .otherwise(pl.col("event_outcome"))
        .alias("event_outcome")
        )
    .select(pl.col("event_id", "event_outcome", "event_outcome_timestamp"))
    )

event_df
shape: (2, 3)
┌──────────┬───────────────┬─────────────────────────┐
│ event_id ┆ event_outcome ┆ event_outcome_timestamp │
│ ---      ┆ ---           ┆ ---                     │
│ i64      ┆ i32           ┆ i64                     │
╞══════════╪═══════════════╪═════════════════════════╡
│ 0        ┆ 0             ┆ null                    │
│ 1        ┆ 1             ┆ 6                       │
└──────────┴───────────────┴─────────────────────────┘
df = df.join(event_df, on="event_id", how="left")

Solution

  • Polars recently released inequality joins with join_where, which is great for scenarios with timespans like this one.

    aggregated = (
        df.join_where(
            df,
            # event starts after start_ts, and goes until end_ts
            pl.col("timestamp_right") > pl.col("start_ts"),
            pl.col("timestamp_right") <= pl.col("end_ts"),
        )
        .group_by("event_id")
        .agg(
            event_outcome=(pl.col("value_right") >= pl.col("threshold")).any().cast(pl.Int32),
            event_outcome_timestamp=(
                pl.col("timestamp_right")
                .filter(pl.col("value_right") >= pl.col("threshold"))
                .min()
            ),
        )
    )
    # aggregated
    # shape: (2, 3)
    # ┌──────────┬───────────────┬─────────────────────────┐
    # │ event_id ┆ event_outcome ┆ event_outcome_timestamp │
    # │ ---      ┆ ---           ┆ ---                     │
    # │ i64      ┆ i32           ┆ i64                     │
    # ╞══════════╪═══════════════╪═════════════════════════╡
    # │ 0        ┆ 0             ┆ null                    │
    # │ 1        ┆ 1             ┆ 6                       │
    # └──────────┴───────────────┴─────────────────────────┘
    
    df.join(aggregated, on="event_id", how="left")
    # same as desired outcome
    

    Have a look at the result of aggregated after the join_where, before the group_by to get a better understanding of how it is working