pythondataframecsvpartitioningpython-polars

How to partition a big Polars dataframe and save each one in parallel to a CSV file?


I have a big Polars dataframe with a lot of groups. Now, I want to partition the dataframe by group and save all sub-dataframes. I can easily do this as follows:

for d in df.partition_by(["group1", "group2"]):
    d.write_csv(f"~/{d[0, 'group1']}_{d[0, 'group2']}.csv")

However, the approach above is sequential and slow when the df is very large and has a whole lot of partitions.

Is there any Polars native way to parallelize it (the code section above)?

If not, how can I do it in a Python native way instead?


Solution

  • Update: Polars 1.25.2 added a Partition API for sinks e.g. PartitionByKey which allows you to hive partition LazyFrames to CSV files. (or parquet, ndjson, ipc)

    If you have an existing DataFrame you can call .lazy() to access it.

    import polars as pl
    
    df = pl.DataFrame({
        "group1": [1, 2, 3, 1, 2, 3, 1],
        "group2": [4, 5, 6, 4, 5, 7, 4]
    }).with_row_index()
    
    df.lazy().sink_csv(
        pl.PartitionByKey("./output/", by=["group1", "group2"]),
        mkdir=True
    )
    

    Which produces the following directory/file structure.

    output/group1=1/
    output/group1=1/group2=4/
    output/group1=1/group2=4/0.csv
    output/group1=2/
    output/group1=2/group2=5/
    output/group1=2/group2=5/0.csv
    output/group1=3/
    output/group1=3/group2=6/
    output/group1=3/group2=6/0.csv
    output/group1=3/group2=7/
    output/group1=3/group2=7/0.csv
    

    If we check one of the results:

    pl.read_csv("output/*=1/*=4/*.csv")
    
    shape: (3, 3)
    ┌───────┬────────┬────────┐
    │ index ┆ group1 ┆ group2 │
    │ ---   ┆ ---    ┆ ---    │
    │ i64   ┆ i64    ┆ i64    │
    ╞═══════╪════════╪════════╡
    │ 0     ┆ 1      ┆ 4      │
    │ 3     ┆ 1      ┆ 4      │
    │ 6     ┆ 1      ┆ 4      │
    └───────┴────────┴────────┘
    

    Original answer

    As far as I am aware, Polars does not currently offer a way to parallelize .write_*() / .sink_*() calls.

    For Parquet files, this would be similar to writing a "Hive Partitioned Dataset":

    Multiprocessing

    In my own testing - a ProcessPool has performed slightly faster than a ThreadPool for this task.

    With Polars + multiprocessing - you need to use get_context("spawn")

    import polars as pl
    from multiprocessing import get_context
    # from multiprocessing.dummy import Pool # <- threadpool
    
    def write_csv(args):
        group, df = args
        df.write_csv(f"{group}.csv")
    
    if __name__ == "__main__":
        
        df = pl.DataFrame({
            "group": [1, 2, 1],
            "val": ["a", "b", "c"]
        })
        
        with get_context("spawn").Pool() as pool:
            groups = df.group_by("group")
            results = pool.map(write_csv, groups) # this is lazy
            
            # we must consume `results` - use empty for loop
            for result in results: 
                ... 
    

    Pool() methods

    There's a lot of discussion out there about the different Pool methods to use:

    In my simple testing, .map produced the fastest results - with similar memory usage.

    But depending on your data size + system specs, you may need to investigate the others: