I have this application where I need to work with very large amount of data. I've read that polars should be able to work with datasets larger than the available memory, so I must be doing something wrong...
In my scenario, the application runs on a kubernetes container, where I configured it to use 12GB of memory. I'm trying to work with data from a CSV with 200 million records, aproximately ~9GB on disk.
The basic format of this CSV is as follows:
id,aligned_value,date_reference,model,predict,segment
00000000001,564,20240520,XPTO,,3
00000000002,741,20240520,XPTO,,6
00000000003,503,20240520,XPTO,,5
00000000004,200,20240520,XPTO,,0
What I am trying to run is a plain and simple aggregation to count the unique values of this dataset, grouped by two fields ("aligned_value", that varies from 0 to 1000, and segment, that goes from 0 to 6). But when I run the following code, memory consumption grows on and on until the container simply gets killed.
def get_summarized_base_df(self, filepath: str) -> pl.DataFrame:
"""
Summarizes the base on a dataframe grouped by all fields included
on this report's setup
"""
# This part will return a list of fields, which for this scenario should be just ["aligned_value", "segment"]
required_fields = self.list_all_required_fields()
base_lf = pl.scan_csv(filepath)
summarized_base_df = base_lf.group_by(required_fields).agg(pl.count()).collect()
return summarized_base_df
Are there parameters I could be using to reduce this memory usage? Am I using the framework wrong?
For the record, I tried to limit polars memory usage by setting an environment variable "POLARS_MAX_MEMORY_MIB", but didn't seem to make any difference at all.
Some additional information:
Python version: 3.10.11 Polars version: 0.20.18
For style points you can use .len()
instead of .agg(pl.count())
, but the real issue is that the streaming processing (which is necessary for larger-than-memory) datasets is still experimental in a lot of places, so it is opt-in. You can opt-in by passing streaming=True
to collect:
base_lf.group_by(required_fields).len().collect(streaming=True)