pythondataframepython-polars

Polars conditional merge of rows


I have some time data, that I want to merge if two intervals are close enough (and the type is the same), currently I'm doing sth. like this:

Note: time_start, time_end are datetimes, type a categorical string

buffer = dt.timedelta(minutes=15)
_temp_df = (
    data.sort("time_start").select([
        pl.col("*"),
        ((pl.col("time_start").shift(-1)) - pl.col("time_end")).fill_null(0).apply(lambda x: (dt.timedelta(seconds=x // 1000))).alias("offset"),
        pl.col("type").shift(-1).alias("type_next")
    ]).select([
        pl.col("time_start"), pl.col("time_end"), pl.col("type"),
        ((pl.col("offset") < buffer) & (pl.col("type") == pl.col("type_next"))).alias("can_merge")
    ])
).to_dicts()

_after_df = []
if len(_temp_df) != 0:
    last_data = _temp_df[0]
    for i in range(1, len(_temp_df)):
        _dp = _temp_df[i]
        if last_data['can_merge']:
            last_data['time_end'] = _dp['time_end']
            last_data['can_merge'] = _dp['can_merge']
        else:
            _after_df.append(last_data)
            last_data = _dp

    _after_df.append(last_data)

data = pl.from_dicts(_after_df)

Whilst this works, it feels extremely off, that I have to convert polars -> dicts -> polars, is there a way to do this over the aggregations? The groupby_dynamic and grouby_rolling seem to only accepts times, but I want to merge consecutive rows by offset.

This is Polars: 0.13.59


Solution

  • You didn't provide any data, so let's start with the following:

    import datetime as dt
    import polars as pl
    
    data = (
        pl.DataFrame(
            {
                "time_start": [ "12:00", "12:20", "12:40", "13:10", "13:15", "13:50", "13:55", "14:50", "15:20", ],
                "time_end": [ "12:15", "12:30", "13:00", "13:20", "13:45", "14:00", "14:45", "15:00", "15:30", ],
                "type": ["a", "a", "a", "b", "b", "c", "c", "a", "a"],
            }
        )
    ).with_columns(
        pl.col("type").cast(pl.Categorical),
        pl.format("2020-01-01T{}:00", "time_start")
          .str.to_datetime()
          .dt.cast_time_unit("ms")
          .alias("time_start"),
        pl.format("2020-01-01T{}:00", "time_end")
          .str.to_datetime()
          .dt.cast_time_unit("ms")
          .alias("time_end"),
    )
    data
    
    shape: (9, 3)
    ┌─────────────────────┬─────────────────────┬──────┐
    │ time_start          ┆ time_end            ┆ type │
    │ ---                 ┆ ---                 ┆ ---  │
    │ datetime[ms]        ┆ datetime[ms]        ┆ cat  │
    ╞═════════════════════╪═════════════════════╪══════╡
    │ 2020-01-01 12:00:00 ┆ 2020-01-01 12:15:00 ┆ a    │
    │ 2020-01-01 12:20:00 ┆ 2020-01-01 12:30:00 ┆ a    │
    │ 2020-01-01 12:40:00 ┆ 2020-01-01 13:00:00 ┆ a    │
    │ 2020-01-01 13:10:00 ┆ 2020-01-01 13:20:00 ┆ b    │
    │ 2020-01-01 13:15:00 ┆ 2020-01-01 13:45:00 ┆ b    │
    │ 2020-01-01 13:50:00 ┆ 2020-01-01 14:00:00 ┆ c    │
    │ 2020-01-01 13:55:00 ┆ 2020-01-01 14:45:00 ┆ c    │
    │ 2020-01-01 14:50:00 ┆ 2020-01-01 15:00:00 ┆ a    │
    │ 2020-01-01 15:20:00 ┆ 2020-01-01 15:30:00 ┆ a    │
    └─────────────────────┴─────────────────────┴──────┘
    

    The Algorithm

    Assumption: the following should work as long as no interval fully encloses another. The intervals can overlap and they can be disjoint. (This is why it's helpful if you provide sample data for your question -- it provides some insight as to the assumptions underlying the data.)

    (
        data
        .sort('time_start')
        .with_columns(
            (
                (
                    pl.col('time_end').dt.offset_by('15m') <
                    pl.col('time_start').shift(-1)
                ) |
                (
                    pl.col('type') != pl.col('type').shift(-1)
                )
            )
            .shift(1, fill_value=False)
            .cum_sum()
            .alias('run_nbr'),
        )
        .group_by('run_nbr')
        .agg(
            pl.col('time_start').min().alias('time_start'),
            pl.col('time_end').max().alias('time_end'),
            pl.col('type').first().alias('type'),
        )
        .sort('time_start')
    )
    
    shape: (5, 4)
    ┌─────────┬─────────────────────┬─────────────────────┬──────┐
    │ run_nbr ┆ time_start          ┆ time_end            ┆ type │
    │ ---     ┆ ---                 ┆ ---                 ┆ ---  │
    │ u32     ┆ datetime[ms]        ┆ datetime[ms]        ┆ cat  │
    ╞═════════╪═════════════════════╪═════════════════════╪══════╡
    │ 0       ┆ 2020-01-01 12:00:00 ┆ 2020-01-01 13:00:00 ┆ a    │
    │ 1       ┆ 2020-01-01 13:10:00 ┆ 2020-01-01 13:45:00 ┆ b    │
    │ 2       ┆ 2020-01-01 13:50:00 ┆ 2020-01-01 14:45:00 ┆ c    │
    │ 3       ┆ 2020-01-01 14:50:00 ┆ 2020-01-01 15:00:00 ┆ a    │
    │ 4       ┆ 2020-01-01 15:20:00 ┆ 2020-01-01 15:30:00 ┆ a    │
    └─────────┴─────────────────────┴─────────────────────┴──────┘
    

    Your code gives the same result when you run it. (I left run_nbr in for the discussion below.) Note: your code converts type from Categorical to str, but the above algorithm maintains it as Categorical.

    In Steps

    The algorithm attempts to assign "run numbers" to the sorted intervals. A "run" of intervals is any successive set of rows that can be collapsed into a single row, based on gap between the intervals and type.

    (
        data
        .sort('time_start')
        .with_columns(
            (
                (
                    pl.col('time_end').dt.offset_by('15m') <
                    pl.col('time_start').shift(-1)
                ) |
                (
                    pl.col('type') != pl.col('type').shift(-1)
                )
            )
            .shift(1, fill_value=False)
            .cum_sum()
            .alias('run_nbr'),
        )
    )
    
    shape: (9, 4)
    ┌─────────────────────┬─────────────────────┬──────┬─────────┐
    │ time_start          ┆ time_end            ┆ type ┆ run_nbr │
    │ ---                 ┆ ---                 ┆ ---  ┆ ---     │
    │ datetime[ms]        ┆ datetime[ms]        ┆ cat  ┆ u32     │
    ╞═════════════════════╪═════════════════════╪══════╪═════════╡
    │ 2020-01-01 12:00:00 ┆ 2020-01-01 12:15:00 ┆ a    ┆ 0       │
    │ 2020-01-01 12:20:00 ┆ 2020-01-01 12:30:00 ┆ a    ┆ 0       │
    │ 2020-01-01 12:40:00 ┆ 2020-01-01 13:00:00 ┆ a    ┆ 0       │
    │ 2020-01-01 13:10:00 ┆ 2020-01-01 13:20:00 ┆ b    ┆ 1       │
    │ 2020-01-01 13:15:00 ┆ 2020-01-01 13:45:00 ┆ b    ┆ 1       │
    │ 2020-01-01 13:50:00 ┆ 2020-01-01 14:00:00 ┆ c    ┆ 2       │
    │ 2020-01-01 13:55:00 ┆ 2020-01-01 14:45:00 ┆ c    ┆ 2       │
    │ 2020-01-01 14:50:00 ┆ 2020-01-01 15:00:00 ┆ a    ┆ 3       │
    │ 2020-01-01 15:20:00 ┆ 2020-01-01 15:30:00 ┆ a    ┆ 4       │
    └─────────────────────┴─────────────────────┴──────┴─────────┘
    

    From there, the group_by does the rest.