I want to synchronize two numpy arrays of timestamps to each other using Polars LazyFrames.
Let's assume that I have two numpy arrays of timestamps which are stored using LazyFrames:
import polars as pl
timestamps = pl.LazyFrame(
np.array(
[
np.datetime64("1970-01-01T00:00:00.500000000"),
np.datetime64("1970-01-01T00:00:01.500000000"),
np.datetime64("1970-01-01T00:00:02.600000000"),
np.datetime64("1970-01-01T00:00:03.400000000"),
np.datetime64("1970-01-01T00:00:04.500000000"),
np.datetime64("1970-01-01T00:00:05.300000000"),
np.datetime64("1970-01-01T00:00:06.200000000"),
np.datetime64("1970-01-01T00:00:07.400000000"),
np.datetime64("1970-01-01T00:00:08.500000000"),
]
),
schema={"values": pl.Datetime}
)
other_timestamps = pl.LazyFrame(
np.array(
[
np.datetime64("1970-01-01T00:00:01.500000000"),
np.datetime64("1970-01-01T00:00:02.000000000"),
np.datetime64("1970-01-01T00:00:02.500000000"),
np.datetime64("1970-01-01T00:00:04.500000000"),
np.datetime64("1970-01-01T00:00:06.000000000"),
np.datetime64("1970-01-01T00:00:06.500000000"),
]
),
schema={"values": pl.Datetime}
)
I also have the expected functionality implemented in numpy:
import numpy as np
import numpy.typing as npt
def _np_sync_to(
timestamps: npt.ArrayLike[np.datetime64],
other: npt.ArrayLike[np.datetime64],
tolerance: str,
):
outer_diffs = np.abs(np.subtract.outer(other, timestamps))
closest_timestamps_indices = outer_diffs.argmin(0)
closest_timestamps = other[closest_timestamps_indices]
diffs = np.abs(closest_timestamps - timestamps)
tolerance = parse_timedelta(tolerance)
within_tolerance = diffs <= tolerance
ts1_synced = timestamps[within_tolerance]
ts2_synced = closest_timestamps[within_tolerance]
return ts1_synced, ts2_synced
np_ts1_synced, np_ts2_synced = _np_sync_to(
timestamps=np.squeeze(timestamps.collect().to_numpy()),
other=np.squeeze(other_timestamps.collect().to_numpy()),
tolerance="500ms",
)
The expected results are:
np_ts1_synced = np.array([
np.datetime64('1970-01-01T00:00:01.500000000'),
np.datetime64('1970-01-01T00:00:02.600000000'),
np.datetime64('1970-01-01T00:00:04.500000000'),
np.datetime64('1970-01-01T00:00:06.200000000')
])
np_ts2_synced = np.array([
np.datetime64('1970-01-01T00:00:01.500000000'),
np.datetime64('1970-01-01T00:00:02.500000000'),
np.datetime64('1970-01-01T00:00:04.500000000'),
np.datetime64('1970-01-01T00:00:06.000000000')
])
So the synced timestamps are basically the nearest timestamps within the specified tolerance. Now I want to implement the same functionality using Polars LazyFrames to process large data.
I tried to implement it equivalently with Polars, but the dimensions of the outer subtraction are not correct and I guess there is a better way to do the computation in general:
# inspired by https://stackoverflow.com/questions/77748729/polars-equivalent-to-np-outer
def sync_to(timestamps, other, tolerance: str):
def _outer(
a: pl.DataFrame | pl.LazyFrame, b: pl.DataFrame | pl.LazyFrame
):
# I guess the following line is incorrect
nrows = pl.len().sqrt().cast(pl.Int32)
return (
a.select("values")
.join(b.select("values"), how="cross")
.select(
computed=(pl.col("values") - pl.col("values_right")).abs()
)
.group_by(pl.arange(0, pl.len()) // nrows, maintain_order=True)
.agg("computed")
.select(pl.col("computed").list.to_struct())
.unnest("computed")
)
outer_diffs = timestamps.pipe(_outer, other)
Another idea I had was:
ts1 = timestamps.sort("values").join_asof(
other.sort("values"),
on="values",
strategy="nearest",
tolerance=tolerance,
)
But the output is not what I want.
It seems that your join_as_of
example works. The only thing is that, as far as I understand, join_as_of
is a left
join, so you have to additionally filter out non joined values by is_not_null()
(or even better, drop_nulls()
as @Hericks suggested in the comment):
timestamps.sort("values").join_asof(
other_timestamps.with_columns(
pl.col('values').alias('other_values')
).sort("values"),
on="values",
strategy="nearest",
tolerance="500ms"
).drop_nulls()
# ).filter(pl.col('right_values').is_not_null())
┌─────────────────────────┬─────────────────────────┐
│ values ┆ other_values │
│ --- ┆ --- │
│ datetime[ns] ┆ datetime[ns] │
╞═════════════════════════╪═════════════════════════╡
│ 1970-01-01 00:00:01.500 ┆ 1970-01-01 00:00:01.500 │
│ 1970-01-01 00:00:02.600 ┆ 1970-01-01 00:00:02.500 │
│ 1970-01-01 00:00:04.500 ┆ 1970-01-01 00:00:04.500 │
│ 1970-01-01 00:00:06.200 ┆ 1970-01-01 00:00:06 │
└─────────────────────────┴─────────────────────────┘