I have a big Polars dataframe with a lot of groups. Now, I want to partition the dataframe by group and save all sub-dataframes. I can easily do this as follows:
for d in df.partition_by(["group1", "group2"]):
d.write_csv(f"~/{d[0, 'group1']}_{d[0, 'group2']}.csv")
However, the approach above is sequential and slow when the df is very large and has a whole lot of partitions.
Is there any Polars native way to parallelize it (the code section above)?
If not, how can I do it in a Python native way instead?
Update: Polars 1.25.2 added a Partition API for sinks e.g. PartitionByKey which allows you to hive partition LazyFrames to CSV files. (or parquet, ndjson, ipc)
If you have an existing DataFrame you can call .lazy() to access it.
import polars as pl
df = pl.DataFrame({
"group1": [1, 2, 3, 1, 2, 3, 1],
"group2": [4, 5, 6, 4, 5, 7, 4]
}).with_row_index()
df.lazy().sink_csv(
pl.PartitionByKey("./output/", by=["group1", "group2"]),
mkdir=True
)
Which produces the following directory/file structure.
output/group1=1/
output/group1=1/group2=4/
output/group1=1/group2=4/0.csv
output/group1=2/
output/group1=2/group2=5/
output/group1=2/group2=5/0.csv
output/group1=3/
output/group1=3/group2=6/
output/group1=3/group2=6/0.csv
output/group1=3/group2=7/
output/group1=3/group2=7/0.csv
If we check one of the results:
pl.read_csv("output/*=1/*=4/*.csv")
shape: (3, 3)
┌───────┬────────┬────────┐
│ index ┆ group1 ┆ group2 │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 │
╞═══════╪════════╪════════╡
│ 0 ┆ 1 ┆ 4 │
│ 3 ┆ 1 ┆ 4 │
│ 6 ┆ 1 ┆ 4 │
└───────┴────────┴────────┘
Original answer
As far as I am aware, Polars does not currently offer a way to parallelize .write_*() / .sink_*() calls.
For Parquet files, this would be similar to writing a "Hive Partitioned Dataset":
In my own testing - a ProcessPool has performed slightly faster than a ThreadPool for this task.
With Polars + multiprocessing - you need to use get_context("spawn")
import polars as pl
from multiprocessing import get_context
# from multiprocessing.dummy import Pool # <- threadpool
def write_csv(args):
group, df = args
df.write_csv(f"{group}.csv")
if __name__ == "__main__":
df = pl.DataFrame({
"group": [1, 2, 1],
"val": ["a", "b", "c"]
})
with get_context("spawn").Pool() as pool:
groups = df.group_by("group")
results = pool.map(write_csv, groups) # this is lazy
# we must consume `results` - use empty for loop
for result in results:
...
group_by instead of partition_by)There's a lot of discussion out there about the different Pool methods to use:
.map.imap.imap_unorderedIn my simple testing, .map produced the fastest results - with similar memory usage.
But depending on your data size + system specs, you may need to investigate the others: