I’m trying to extract some features from a dataset and then write the results to a Parquet file using the Polars library in Python. Here’s the code I’m using:
import ipaddress
import numpy as np
import polars as pl
def extract_session_features(sessions: pl.LazyFrame) -> pl.LazyFrame:
return (
sessions.with_columns(
(pl.col("dpkts") + pl.col("spkts")).alias("total_packets"),
(pl.col("dbytes") + pl.col("sbytes")).alias("total_bytes"),
(pl.col("dpkts") / pl.col("spkts")).alias("bytes_ratio"),
(pl.col("dbytes") / pl.col("sbytes")).alias("packets_ratio"),
(pl.col("spkts") / pl.col("dur")).alias("sent_packets_rate"),
(pl.col("dpkts") / pl.col("dur")).alias("received_packets_rate"),
(pl.col("sbytes") / pl.col("dur")).alias("sent_bytes_rate"),
(pl.col("dbytes") / pl.col("dur")).alias("received_bytes_rate"),
(pl.col("sbytes") / pl.col("spkts")).alias("mean_pkt_sent_size"),
(pl.col("dbytes") / pl.col("dpkts")).alias("mean_pkt_recv_size"),
(
pl.col("Timestamp")
.diff()
.dt.total_seconds()
.fill_null(0)
.over("ID")
.alias("time_since_last_session")
),
)
.with_columns(
pl.when(pl.col("^.*(_ratio|_rate).*$").is_infinite())
.then(-1)
.otherwise(pl.col("^.*(_ratio|_rate).*$"))
.name.keep()
)
.fill_nan(-1)
)
filtered_sessions = pl.scan_parquet("./processed_merge_file_filtered.parquet")
print(filtered_sessions.head().collect(streaming=True))
sessions_features = extract_session_features(filtered_sessions)
sessions_features.sink_parquet("./sessions_features")
When I run this code, I get the following error:
thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-lazy/src/physical_plan/planner/lp.rs:153:28:
sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/home/cpinon/Documentos/Project/SourceCode/project/data/01_raw/sessions/test_feature_engineering.py", line 50, in <module>
sessions_features.sink_parquet("./sessions_features")
File "/home/cpinon/Documentos/Project/SourceCode/project/.venv/lib/python3.9/site-packages/polars/lazyframe/frame.py", line 1895, in sink_parquet
return lf.sink_parquet(
pyo3_runtime.PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'
The error message suggests using collect().write_parquet(), but I’m not sure why I can't use sink_parquet() method. If I use the former solution, my computer runs out of memory, since it cannot process the entire dataframe.
Not all polars operations are supported by the streaming engine. In particular, I would guess that this is what's keeping you from streaming:
(
pl.col("Timestamp")
.diff()
.dt.total_seconds()
.fill_null(0)
.over("ID")
.alias("time_since_last_session")
),
you can verify that by doing
print(sessions_features.explain(streaming=True))
you'll then get something that looks a little bit like:
print(pl.select(a=pl.lit(1)).lazy().select(pl.col('a').pow(2)).explain(streaming=True))
--- STREAMING
SELECT [col("a").pow([2])] FROM
DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: "None" --- END STREAMING
DF []; PROJECT */0 COLUMNS; SELECTION: "None"
You want to see everything in between the --- STREAMING
and the --- END STREAMING
. Any operations that are outside those lines aren't supported by streaming.
The official documentation on streaming is a bit lax but here's some extra info.
As far as getting what you need, you could do something loosely like in Quasi sinking workaround where you have to get a unique list of your ID
and then loop through a filtered version.
Another idea would be to just use the pyarrow dataset writer or duckdb as they may (I don't have specific knowledge on whether or not they do) be able to support your streaming need.