pythonout-of-memorypython-polars

Find the most recent article in a group and stream the result to disk


I have

from pathlib import Path
import polars as pl

inDir   = r"E:\Personal Projects\tmp\tarFiles\result2"
outDir  = r"C:\Users\Akira\Documents"
inDir   = Path(inDir)
outDir  = Path(outDir)

schema = {"name"        : pl.String,
          "dateModified": pl.String,
          "identifier"  : pl.UInt64,
          "url"         : pl.String,
          "html"        : pl.String}

lf = pl.scan_ndjson(inDir / "*wiktionary*.ndjson", schema=schema)
lf = lf.group_by(["url"]).agg(pl.max("dateModified").alias("dateModified"))
lf.sink_ndjson(outDir / "out.ndjson",
               maintain_order=False,
               engine="streaming")

I have many ndjson files that together contain millions of of json objects, each of which has the form of a dict. I would like to group by "url" and pick the json with the most recent "dateModified". In above code, only "url" and "dateModified" remain.

Could you explain how to keep selected columns, e.g., "name" and "html"?


Solution

  • Assuming "name" and "html" are the same for each "url", just add them to the aggregations:

    lf = lf.group_by(["url"]).agg(pl.max("dateModified"),
                                  pl.col("name"),
                                  pl.col("html"))
    

    If not, you will need to join them on the grouped dataframe afterwards:

    lf_group = lf.group_by(["url"]).agg(pl.max("dateModified"))
    lf_group = lf_group.join(lf, on=["url", "dateModified"], how="left")
    

    To debug the OOM, you can set/try the following to get more info:

    # Set env variables
    os.environ["POLARS_VERBOSE"] = "1"
    os.environ["POLARS_TRACK_METRICS"] = "1"
    
    # Check Polars query engine output
    lf.explain(streaming=True)
    lf.show_graph(plan_stage="physical", engine="streaming")