In the demo DataFrame I have three events:
import polars as pl
df = pl.DataFrame(
{
"timestamp": [1, 2, 3, 4, 5, 6, 7, 8],
"threshold": [8, None, None, None, 5, None, None, 8],
"value": [2, 3, 4, 5, 6, 7, 8, 9],
"event": [1, 0, 0, 0, 1, 0, 0, 1],
"start_ts": [1, None, None, None, 5, None, None, 8],
"end_ts": [6, None, None, None, 8, None, None, 8],
"event_id": [0, None, None, None, 1, None, None, 2],
}
).with_columns(pl.col("end_ts").sub(pl.col("start_ts")).alias("event_span"))
print(df)
shape: (8, 8)
┌───────────┬───────────┬───────┬───────┬──────────┬────────┬──────────┬────────────┐
│ timestamp ┆ threshold ┆ value ┆ event ┆ start_ts ┆ end_ts ┆ event_id ┆ event_span │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 │
╞═══════════╪═══════════╪═══════╪═══════╪══════════╪════════╪══════════╪════════════╡
│ 1 ┆ 8 ┆ 2 ┆ 1 ┆ 1 ┆ 6 ┆ 0 ┆ 5 │
│ 2 ┆ null ┆ 3 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 3 ┆ null ┆ 4 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 4 ┆ null ┆ 5 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 5 ┆ 5 ┆ 6 ┆ 1 ┆ 5 ┆ 8 ┆ 1 ┆ 3 │
│ 6 ┆ null ┆ 7 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 7 ┆ null ┆ 8 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 8 ┆ 8 ┆ 9 ┆ 1 ┆ 8 ┆ 8 ┆ 2 ┆ 0 │
└───────────┴───────────┴───────┴───────┴──────────┴────────┴──────────┴────────────┘
timestamp is the timestamp in the real world.threshold is the value that every events' value need to achieve, or exceed during the event span.value is the value at each timestamp, we can have duplicated values.event is a binary column indicating whether a certain timestamp generates event.start_ts is the starting timestamp of an event. For example, a start_ts of 1 means that the event will start at the end of timestamp1, at the beginning of timestamp2end_ts is the ending timestamp of an event.event_id is a unique identifier for each event.event_span is the number of timestamp that an event spans.Problem: I want to identify:
event outcome: binary value indicating whether the threshold of each event is reached by value during each event span.event outcome timestamp: the timestamp where the first time of value reaching thresholdAdditional Note:
end_ts <= timestamp)Desired Output:
shape: (8, 10)
┌───────────┬───────────┬───────┬───────┬───┬──────────┬────────────┬───────────────┬──────────────┐
│ timestamp ┆ threshold ┆ value ┆ event ┆ … ┆ event_id ┆ event_span ┆ event_outcome ┆ event_outcom │
│ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ e_timestamp │
│ i64 ┆ i64 ┆ i64 ┆ i64 ┆ ┆ i64 ┆ i64 ┆ i32 ┆ --- │
│ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ i64 │
╞═══════════╪═══════════╪═══════╪═══════╪═══╪══════════╪════════════╪═══════════════╪══════════════╡
│ 1 ┆ 8 ┆ 2 ┆ 1 ┆ … ┆ 0 ┆ 5 ┆ 0 ┆ null │
│ 2 ┆ null ┆ 3 ┆ 0 ┆ … ┆ null ┆ null ┆ null ┆ null │
│ 3 ┆ null ┆ 4 ┆ 0 ┆ … ┆ null ┆ null ┆ null ┆ null │
│ 4 ┆ null ┆ 5 ┆ 0 ┆ … ┆ null ┆ null ┆ null ┆ null │
│ 5 ┆ 5 ┆ 6 ┆ 1 ┆ … ┆ 1 ┆ 3 ┆ 1 ┆ 6 │
│ 6 ┆ null ┆ 7 ┆ 0 ┆ … ┆ null ┆ null ┆ null ┆ null │
│ 7 ┆ null ┆ 8 ┆ 0 ┆ … ┆ null ┆ null ┆ null ┆ null │
│ 8 ┆ 8 ┆ 9 ┆ 1 ┆ … ┆ 2 ┆ 0 ┆ null ┆ null │
└───────────┴───────────┴───────┴───────┴───┴──────────┴────────────┴───────────────┴──────────────┘
My current solution involves generating the full path values of each event, which is highly resource intensive, especially when the events overlap with each other:
event_df = (
df
.filter(pl.col("event") == 1, pl.col("event_span") > 0)
.with_columns(
pl.int_ranges(pl.col("start_ts")+1, pl.col("end_ts")+1) # Map event full path
.alias("event_timestamps")
)
.explode("event_timestamps") # Generate event full path
.drop(pl.col("value"))
.join(
df
.select(pl.col("timestamp"), pl.col("value"))
.rename({"timestamp": "event_timestamps"}),
on="event_timestamps", how="left") # Get the value of the full path
.with_columns(# Map event outcome based on threshold
pl.when(pl.col("value") >= pl.col("threshold"))
.then(1)
.otherwise(None)
.alias("event_outcome")
)
.with_columns(# Get event outcome timestamp
pl.when(pl.col("event_outcome") == 1)
.then(pl.col("event_timestamps"))
.otherwise(None)
.alias("event_outcome_timestamp")
)
.with_columns(# Map event outcome to the event start timestamp
pl.col("event_outcome", "event_outcome_timestamp")
.fill_null(strategy="backward")
.over("event_id")
)
.unique("event_id")
.sort("event_id")
.with_columns(# Take care of the events that have not exceeded the threshold
pl.when(pl.col("event_outcome").is_null())
.then(0)
.otherwise(pl.col("event_outcome"))
.alias("event_outcome")
)
.select(pl.col("event_id", "event_outcome", "event_outcome_timestamp"))
)
event_df
shape: (2, 3)
┌──────────┬───────────────┬─────────────────────────┐
│ event_id ┆ event_outcome ┆ event_outcome_timestamp │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i32 ┆ i64 │
╞══════════╪═══════════════╪═════════════════════════╡
│ 0 ┆ 0 ┆ null │
│ 1 ┆ 1 ┆ 6 │
└──────────┴───────────────┴─────────────────────────┘
df = df.join(event_df, on="event_id", how="left")
Polars recently released inequality joins with join_where, which is great for scenarios with timespans like this one.
aggregated = (
df.join_where(
df,
# event starts after start_ts, and goes until end_ts
pl.col("timestamp_right") > pl.col("start_ts"),
pl.col("timestamp_right") <= pl.col("end_ts"),
)
.group_by("event_id")
.agg(
event_outcome=(pl.col("value_right") >= pl.col("threshold")).any().cast(pl.Int32),
event_outcome_timestamp=(
pl.col("timestamp_right")
.filter(pl.col("value_right") >= pl.col("threshold"))
.min()
),
)
)
# aggregated
# shape: (2, 3)
# ┌──────────┬───────────────┬─────────────────────────┐
# │ event_id ┆ event_outcome ┆ event_outcome_timestamp │
# │ --- ┆ --- ┆ --- │
# │ i64 ┆ i32 ┆ i64 │
# ╞══════════╪═══════════════╪═════════════════════════╡
# │ 0 ┆ 0 ┆ null │
# │ 1 ┆ 1 ┆ 6 │
# └──────────┴───────────────┴─────────────────────────┘
df.join(aggregated, on="event_id", how="left")
# same as desired outcome
Have a look at the result of aggregated after the join_where, before the group_by to get a better understanding of how it is working