pythondataframelazy-evaluationpython-polars

Polars timestamp synchronization lazy evaluation


I want to synchronize two numpy arrays of timestamps to each other using Polars LazyFrames.

Let's assume that I have two numpy arrays of timestamps which are stored using LazyFrames:

import polars as pl


timestamps = pl.LazyFrame(
    np.array(
        [
            np.datetime64("1970-01-01T00:00:00.500000000"),
            np.datetime64("1970-01-01T00:00:01.500000000"),
            np.datetime64("1970-01-01T00:00:02.600000000"),
            np.datetime64("1970-01-01T00:00:03.400000000"),
            np.datetime64("1970-01-01T00:00:04.500000000"),
            np.datetime64("1970-01-01T00:00:05.300000000"),
            np.datetime64("1970-01-01T00:00:06.200000000"),
            np.datetime64("1970-01-01T00:00:07.400000000"),
            np.datetime64("1970-01-01T00:00:08.500000000"),
        ]
    ),
    schema={"values": pl.Datetime}
)

other_timestamps = pl.LazyFrame(
    np.array(
        [
            np.datetime64("1970-01-01T00:00:01.500000000"),
            np.datetime64("1970-01-01T00:00:02.000000000"),
            np.datetime64("1970-01-01T00:00:02.500000000"),
            np.datetime64("1970-01-01T00:00:04.500000000"),
            np.datetime64("1970-01-01T00:00:06.000000000"),
            np.datetime64("1970-01-01T00:00:06.500000000"),
        ]
    ),
    schema={"values": pl.Datetime}
)

I also have the expected functionality implemented in numpy:

import numpy as np
import numpy.typing as npt


def _np_sync_to(
    timestamps: npt.ArrayLike[np.datetime64],
    other: npt.ArrayLike[np.datetime64],
    tolerance: str,
):
    outer_diffs = np.abs(np.subtract.outer(other, timestamps))
    closest_timestamps_indices = outer_diffs.argmin(0)
    closest_timestamps = other[closest_timestamps_indices]
    diffs = np.abs(closest_timestamps - timestamps)
    tolerance = parse_timedelta(tolerance)
    within_tolerance = diffs <= tolerance

    ts1_synced = timestamps[within_tolerance]
    ts2_synced = closest_timestamps[within_tolerance]

    return ts1_synced, ts2_synced


np_ts1_synced, np_ts2_synced = _np_sync_to(
    timestamps=np.squeeze(timestamps.collect().to_numpy()),
    other=np.squeeze(other_timestamps.collect().to_numpy()),
    tolerance="500ms",
)

The expected results are:

np_ts1_synced = np.array([
    np.datetime64('1970-01-01T00:00:01.500000000'), 
    np.datetime64('1970-01-01T00:00:02.600000000'), 
    np.datetime64('1970-01-01T00:00:04.500000000'), 
    np.datetime64('1970-01-01T00:00:06.200000000')
])

np_ts2_synced = np.array([
    np.datetime64('1970-01-01T00:00:01.500000000'), 
    np.datetime64('1970-01-01T00:00:02.500000000'), 
    np.datetime64('1970-01-01T00:00:04.500000000'), 
    np.datetime64('1970-01-01T00:00:06.000000000')
])

So the synced timestamps are basically the nearest timestamps within the specified tolerance. Now I want to implement the same functionality using Polars LazyFrames to process large data.

I tried to implement it equivalently with Polars, but the dimensions of the outer subtraction are not correct and I guess there is a better way to do the computation in general:

# inspired by https://stackoverflow.com/questions/77748729/polars-equivalent-to-np-outer
def sync_to(timestamps, other, tolerance: str):
    def _outer(
        a: pl.DataFrame | pl.LazyFrame, b: pl.DataFrame | pl.LazyFrame
    ):
        # I guess the following line is incorrect
        nrows = pl.len().sqrt().cast(pl.Int32)
        return (
            a.select("values")
            .join(b.select("values"), how="cross")
            .select(
                computed=(pl.col("values") - pl.col("values_right")).abs()
            )
            .group_by(pl.arange(0, pl.len()) // nrows, maintain_order=True)
            .agg("computed")
            .select(pl.col("computed").list.to_struct())
            .unnest("computed")
        )

    outer_diffs = timestamps.pipe(_outer, other)

Another idea I had was:

ts1 = timestamps.sort("values").join_asof(
    other.sort("values"),
    on="values",
    strategy="nearest",
    tolerance=tolerance,
)

But the output is not what I want.


Solution

  • It seems that your join_as_of example works. The only thing is that, as far as I understand, join_as_of is a left join, so you have to additionally filter out non joined values by is_not_null() (or even better, drop_nulls() as @Hericks suggested in the comment):

    timestamps.sort("values").join_asof(
        other_timestamps.with_columns(
            pl.col('values').alias('other_values')
        ).sort("values"),
        on="values",
        strategy="nearest",
        tolerance="500ms"
    ).drop_nulls()
    # ).filter(pl.col('right_values').is_not_null())
    
    ┌─────────────────────────┬─────────────────────────┐
    │ values                  ┆ other_values            │
    │ ---                     ┆ ---                     │
    │ datetime[ns]            ┆ datetime[ns]            │
    ╞═════════════════════════╪═════════════════════════╡
    │ 1970-01-01 00:00:01.500 ┆ 1970-01-01 00:00:01.500 │
    │ 1970-01-01 00:00:02.600 ┆ 1970-01-01 00:00:02.500 │
    │ 1970-01-01 00:00:04.500 ┆ 1970-01-01 00:00:04.500 │
    │ 1970-01-01 00:00:06.200 ┆ 1970-01-01 00:00:06     │
    └─────────────────────────┴─────────────────────────┘