pythonpython-polarspolars

Efficient (and Safe) Way of Accessing Larger-than-Memory Datasets in Parallel


I am trying to use polars~=1.24.0 on Python 3.13 to process larger-than-memory sized datasets. Specifically, I am loading many (i.e., 35 of them) parquet files via the polars.scan_parquet('base-name-*.parquet') API to create the LazyFrame.

In my use case, I need to batch process this dataset and ingest it to a remote database backend. I am using concurrent.futures.ThreadPoolExecutor to spawn multiple threads and do the batch processing.

What is the best way to cover the whole dataset while making sure that the workers would not result in OOM issues on my local machine?

I have thought of a kernel (to be passed to the ThreadPoolExecutor) of the form:

def parse(df: pl.LazyFrame, rank: int, size: int):
    split, remain = divmod(size, rank)
    start = rank * split + min(rank, remain)
    end = start + split + (rank < remain)

    for row in df.slice(start, end-start).collect(new_streaming=True).iter_rows():
        # do something with the row
        pass

but I have the following questions:

  1. Is it safe to pass df: pl.LazyFrame to other threads like this,
  2. Is it safe to (read-only) access the dataframe/lazyframe via .slice and cover all the rows,
  3. .collect still loads (or seems to load, in htop) the whole chunk to the memory -- is there a more efficient/correct way to stream those while being able to use the "iterator" interface? Or should I further slice / mini-batch the range [start, end)?

I am new to polars and I would love to get some suggestions/help along the above use case.


Solution

  • This is probably not a good job for polars until/unless they incorporate this feature. I think you'd be better off using pyarrow and iterate over files and row groups. If you want to use python to parallelize, use MultiProcess with spawn mode, don't use fork.

    Here's a serial version of iterating with pyarrow. You could multiprocess over the files but not the inner row groups.

    from pathlib import Path
    from pyarrow import parquet as pq
    import polars as pl
    rootpath = Path(".")
    files = rootpath.iterdir()
    
    for fl in files:
        pqfile = pq.ParquetFile(fl)
        num_rgs = pqfile.num_row_groups
        for rg_i in range(num_rgs):
            rg_table = pqfile.read_row_group(rg_i)
            df = pl.from_arrow(rg_table) # optional
            ## do whatever with pyarrow table or df