In the demo DataFrame I have three events:
df = pl.DataFrame(
{
"timestamp": [1, 2, 3, 4, 5, 6, 7, 8],
"threshold": [8, None, None, None, 5, None, None, 8],
"value": [2, 3, 4, 5, 6, 7, 8, 9],
"event": [1, 0, 0, 0, 1, 0, 0, 1],
"start_ts": [1, None, None, None, 5, None, None, 8],
"end_ts": [6, None, None, None, 8, None, None, 8],
"event_id": [0, None, None, None, 1, None, None, 2],
}
).with_columns(pl.col("end_ts").sub(pl.col("start_ts")).alias("event_span"))
print(df)
out:
shape: (8, 8)
┌───────────┬───────────┬───────┬───────┬──────────┬────────┬──────────┬────────────┐
│ timestamp ┆ threshold ┆ value ┆ event ┆ start_ts ┆ end_ts ┆ event_id ┆ event_span │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 │
╞═══════════╪═══════════╪═══════╪═══════╪══════════╪════════╪══════════╪════════════╡
│ 1 ┆ 8 ┆ 2 ┆ 1 ┆ 1 ┆ 6 ┆ 0 ┆ 5 │
│ 2 ┆ null ┆ 3 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 3 ┆ null ┆ 4 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 4 ┆ null ┆ 5 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 5 ┆ 5 ┆ 6 ┆ 1 ┆ 5 ┆ 8 ┆ 1 ┆ 3 │
│ 6 ┆ null ┆ 7 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 7 ┆ null ┆ 8 ┆ 0 ┆ null ┆ null ┆ null ┆ null │
│ 8 ┆ 8 ┆ 9 ┆ 1 ┆ 8 ┆ 8 ┆ 2 ┆ 0 │
└───────────┴───────────┴───────┴───────┴──────────┴────────┴──────────┴────────────┘
timestamp
is the timestamp in the real world.threshold
is the value that every events' value need to achieve, or exceed during the event span.value
is the value at each timestamp, we can have duplicated values.event
is a binary column indicating whether a certain timestamp generates event.start_ts
is the starting timestamp
of an event. For example, a start_ts
of 1 means that the event will start at the end of timestamp
1, at the beginning of timestamp
2end_ts
is the ending timestamp
of an event.event_id
is a unique identifier for each event.event_span
is the number of timestamp
that an event spans.Problem: I want to identify:
event outcome
: binary value indicating whether the threshold
of each event is reached by value
during each event span.event outcome timestamp
: the timestamp
where the first time of value
reaching threshold
Additional Note:
end_ts
<= timestamp
)Desired Output:
shape: (8, 10)
┌───────────┬───────────┬───────┬───────┬───┬──────────┬────────────┬───────────────┬──────────────┐
│ timestamp ┆ threshold ┆ value ┆ event ┆ … ┆ event_id ┆ event_span ┆ event_outcome ┆ event_outcom │
│ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ e_timestamp │
│ i64 ┆ i64 ┆ i64 ┆ i64 ┆ ┆ i64 ┆ i64 ┆ i32 ┆ --- │
│ ┆ ┆ ┆ ┆ ┆ ┆ ┆ ┆ i64 │
╞═══════════╪═══════════╪═══════╪═══════╪═══╪══════════╪════════════╪═══════════════╪══════════════╡
│ 1 ┆ 8 ┆ 2 ┆ 1 ┆ … ┆ 0 ┆ 5 ┆ 0 ┆ null │
│ 2 ┆ null ┆ 3 ┆ 0 ┆ … ┆ null ┆ null ┆ null ┆ null │
│ 3 ┆ null ┆ 4 ┆ 0 ┆ … ┆ null ┆ null ┆ null ┆ null │
│ 4 ┆ null ┆ 5 ┆ 0 ┆ … ┆ null ┆ null ┆ null ┆ null │
│ 5 ┆ 5 ┆ 6 ┆ 1 ┆ … ┆ 1 ┆ 3 ┆ 1 ┆ 6 │
│ 6 ┆ null ┆ 7 ┆ 0 ┆ … ┆ null ┆ null ┆ null ┆ null │
│ 7 ┆ null ┆ 8 ┆ 0 ┆ … ┆ null ┆ null ┆ null ┆ null │
│ 8 ┆ 8 ┆ 9 ┆ 1 ┆ … ┆ 2 ┆ 0 ┆ null ┆ null │
└───────────┴───────────┴───────┴───────┴───┴──────────┴────────────┴───────────────┴──────────────┘
My current solution involves generating the full path values of each event, which is highly resource intensive, especially when the events overlap with each other:
event_df = (
df
.filter(pl.col("event") == 1, pl.col("event_span") > 0)
.with_columns(
pl.int_ranges(pl.col("start_ts")+1, pl.col("end_ts")+1) # Map event full path
.alias("event_timestamps")
)
.explode("event_timestamps") # Generate event full path
.drop(pl.col("value"))
.join(
df
.select(pl.col("timestamp"), pl.col("value"))
.rename({"timestamp": "event_timestamps"}),
on="event_timestamps", how="left") # Get the value of the full path
.with_columns(# Map event outcome based on threshold
pl.when(pl.col("value") >= pl.col("threshold"))
.then(1)
.otherwise(None)
.alias("event_outcome")
)
.with_columns(# Get event outcome timestamp
pl.when(pl.col("event_outcome") == 1)
.then(pl.col("event_timestamps"))
.otherwise(None)
.alias("event_outcome_timestamp")
)
.with_columns(# Map event outcome to the event start timestamp
pl.col("event_outcome", "event_outcome_timestamp")
.fill_null(strategy="backward")
.over("event_id")
)
.unique("event_id")
.sort("event_id")
.with_columns(# Take care of the events that have not exceeded the threshold
pl.when(pl.col("event_outcome").is_null())
.then(0)
.otherwise(pl.col("event_outcome"))
.alias("event_outcome")
)
.select(pl.col("event_id", "event_outcome", "event_outcome_timestamp"))
)
event_df
shape: (2, 3)
┌──────────┬───────────────┬─────────────────────────┐
│ event_id ┆ event_outcome ┆ event_outcome_timestamp │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i32 ┆ i64 │
╞══════════╪═══════════════╪═════════════════════════╡
│ 0 ┆ 0 ┆ null │
│ 1 ┆ 1 ┆ 6 │
└──────────┴───────────────┴─────────────────────────┘
df = df.join(event_df, on="event_id", how="left")
Polars recently released inequality joins with join_where
, which is great for scenarios with timespans like this one.
aggregated = (
df.join_where(
df,
# event starts after start_ts, and goes until end_ts
pl.col("timestamp_right") > pl.col("start_ts"),
pl.col("timestamp_right") <= pl.col("end_ts"),
)
.group_by("event_id")
.agg(
event_outcome=(pl.col("value_right") >= pl.col("threshold")).any().cast(pl.Int32),
event_outcome_timestamp=(
pl.col("timestamp_right")
.filter(pl.col("value_right") >= pl.col("threshold"))
.min()
),
)
)
# aggregated
# shape: (2, 3)
# ┌──────────┬───────────────┬─────────────────────────┐
# │ event_id ┆ event_outcome ┆ event_outcome_timestamp │
# │ --- ┆ --- ┆ --- │
# │ i64 ┆ i32 ┆ i64 │
# ╞══════════╪═══════════════╪═════════════════════════╡
# │ 0 ┆ 0 ┆ null │
# │ 1 ┆ 1 ┆ 6 │
# └──────────┴───────────────┴─────────────────────────┘
df.join(aggregated, on="event_id", how="left")
# same as desired outcome
Have a look at the result of aggregated
after the join_where
, before the group_by
to get a better understanding of how it is working