pythonout-of-memorypython-polars

Polars: out-of-memory problem of groupby-max


I have several ndjson files that are nearly 800GB. They come from parsing the Wikipedia dump. I would like to remove duplicates html. As such, I group by "html" and pick the json with the most recent "dateModified".

from pathlib import Path
import polars as pl

inDir   = r"E:\Personal Projects\tmp\tarFiles\result2"
outDir  = r"C:\Users\Akira\Documents\out_polars.ndjson"
inDir   = Path(inDir)
outDir  = Path(outDir)

schema = {"name"        : pl.String,
          "dateModified": pl.String,
          "identifier"  : pl.UInt64,
          "url"         : pl.String,
          "html"        : pl.String}

lf = pl.scan_ndjson(inDir / "*wiktionary*.ndjson", schema=schema)
lf = lf.group_by(["html"]).agg(pl.max("dateModified").alias("dateModified"))
lf.sink_ndjson(outDir,
               maintain_order=False,
               engine="streaming")

However, I encounter the out-of-memory (OOM) error: the RAM usage increases gradually until my laptop crashes:

The Kernel crashed while executing code in the current cell or a previous cell. 
Please review the code in the cell(s) to identify a possible cause of the failure. 
Click <a href='https://aka.ms/vscodeJupyterKernelCrash'>here</a> for more info. 
View Jupyter <a href='command:jupyter.viewOutput'>log</a> for further details.

How can we resolve the OOM problem? Thank you for your elaboration.


Solution

  • Following up on my comment, the below could be an option. It avoids including all "html" data in the group_by which likely causes the OOM. But since there is no MRE, I cannot test it to ensure it's correct...

    # Create hash of "html" using streaming
    schema = {"name"        : pl.String,
              "dateModified": pl.String,
              "identifier"  : pl.UInt64,
              "url"         : pl.String,
              "html"        : pl.String}
    
    lf = pl.scan_ndjson(inDir / "*wiktionary*.ndjson", schema=schema)
    lf = lf.select(pl.col("html", "dateModified"),
                   pl.col("html").hash(10, 20, 30, 40).alias("html_hash"))
    lf.sink_ipc(tmpDir / "hash.arrow",
                maintain_order=False,
                engine="streaming")
    
    # Group-by on hash, not including "html"
    lf = pl.read_ipc(tmpDir / "hash.arrow", columns=["html_hash", "dateModified"])
    df = lf.group_by("html_hash").agg(pl.max("dateModified"))
    df.write_ipc(tmpDir / "groupby.arrow")
    
    # Join "html" on result of group-by using streaming
    df = pl.scan_ipc(tmpDir / "groupby.arrow")
    lf = pl.scan_ipc(tmpDir / "hash.arrow").select(pl.col("html_hash", "html"))
    df = df.join(lf, on="html_hash", how="left")
    df.sink_ipc(outDir / "output.arrow",
                maintain_order=False,
                engine="streaming")