I have
from pathlib import Path
import polars as pl
inDir = r"E:\Personal Projects\tmp\tarFiles\result2"
outDir = r"C:\Users\Akira\Documents"
inDir = Path(inDir)
outDir = Path(outDir)
schema = {"name" : pl.String,
"dateModified": pl.String,
"identifier" : pl.UInt64,
"url" : pl.String,
"html" : pl.String}
lf = pl.scan_ndjson(inDir / "*wiktionary*.ndjson", schema=schema)
lf = lf.group_by(["url"]).agg(pl.max("dateModified").alias("dateModified"))
lf.sink_ndjson(outDir / "out.ndjson",
maintain_order=False,
engine="streaming")
I have many ndjson files that together contain millions of of json objects, each of which has the form of a dict. I would like to group by "url" and pick the json with the most recent "dateModified". In above code, only "url" and "dateModified" remain.
Could you explain how to keep selected columns, e.g., "name" and "html"?
Assuming "name" and "html" are the same for each "url", just add them to the aggregations:
lf = lf.group_by(["url"]).agg(pl.max("dateModified"),
pl.col("name"),
pl.col("html"))
If not, you will need to join them on the grouped dataframe afterwards:
lf_group = lf.group_by(["url"]).agg(pl.max("dateModified"))
lf_group = lf_group.join(lf, on=["url", "dateModified"], how="left")
To debug the OOM, you can set/try the following to get more info:
# Set env variables
os.environ["POLARS_VERBOSE"] = "1"
os.environ["POLARS_TRACK_METRICS"] = "1"
# Check Polars query engine output
lf.explain(streaming=True)
lf.show_graph(plan_stage="physical", engine="streaming")