I have several ndjson files that are nearly 800GB. They come from parsing the Wikipedia dump. I would like to remove duplicates html. As such, I group by "html" and pick the json with the most recent "dateModified".
from pathlib import Path
import polars as pl
inDir = r"E:\Personal Projects\tmp\tarFiles\result2"
outDir = r"C:\Users\Akira\Documents\out_polars.ndjson"
inDir = Path(inDir)
outDir = Path(outDir)
schema = {"name" : pl.String,
"dateModified": pl.String,
"identifier" : pl.UInt64,
"url" : pl.String,
"html" : pl.String}
lf = pl.scan_ndjson(inDir / "*wiktionary*.ndjson", schema=schema)
lf = lf.group_by(["html"]).agg(pl.max("dateModified").alias("dateModified"))
lf.sink_ndjson(outDir,
maintain_order=False,
engine="streaming")
However, I encounter the out-of-memory (OOM) error: the RAM usage increases gradually until my laptop crashes:
The Kernel crashed while executing code in the current cell or a previous cell.
Please review the code in the cell(s) to identify a possible cause of the failure.
Click <a href='https://aka.ms/vscodeJupyterKernelCrash'>here</a> for more info.
View Jupyter <a href='command:jupyter.viewOutput'>log</a> for further details.
How can we resolve the OOM problem? Thank you for your elaboration.
Following up on my comment, the below could be an option. It avoids including all "html" data in the group_by which likely causes the OOM. But since there is no MRE, I cannot test it to ensure it's correct...
# Create hash of "html" using streaming
schema = {"name" : pl.String,
"dateModified": pl.String,
"identifier" : pl.UInt64,
"url" : pl.String,
"html" : pl.String}
lf = pl.scan_ndjson(inDir / "*wiktionary*.ndjson", schema=schema)
lf = lf.select(pl.col("html", "dateModified"),
pl.col("html").hash(10, 20, 30, 40).alias("html_hash"))
lf.sink_ipc(tmpDir / "hash.arrow",
maintain_order=False,
engine="streaming")
# Group-by on hash, not including "html"
lf = pl.read_ipc(tmpDir / "hash.arrow", columns=["html_hash", "dateModified"])
df = lf.group_by("html_hash").agg(pl.max("dateModified"))
df.write_ipc(tmpDir / "groupby.arrow")
# Join "html" on result of group-by using streaming
df = pl.scan_ipc(tmpDir / "groupby.arrow")
lf = pl.scan_ipc(tmpDir / "hash.arrow").select(pl.col("html_hash", "html"))
df = df.join(lf, on="html_hash", how="left")
df.sink_ipc(outDir / "output.arrow",
maintain_order=False,
engine="streaming")