I am currently working with larger-than-ram data. Therefore, I am relying on the Streaming engine to run all operations. The problem is that, given the complexity of the process, my kernel crashes (silently).
To address this, I collect
some intermediate steps, to convert it right after to LazyFrame
. I have found that for my processes, a better approach would be to sink_parquet
and then scan_parquet
, as in this way I avoid writing the intermediate DataFrame to disk after collecting -and before turning it a lazyframe-.
The problem I face is that this process "pollutes" my storage system. Therefore I am wondering if there is any way to sink-scan
a temporary parquet file, avoiding writing any intermediate heavy dataframe.
You can use a builtin tempfile library.
tempfile.TemporaryFile(mode='w+b', buffering=-1, encoding=None, newline=None, suffix=None, prefix=None, dir=None, *, errors=None)
Return a file-like object that can be used as a temporary storage area. The file is created securely, using the same rules as
mkstemp()
. It will be destroyed as soon as it is closed (including an implicit close when the object is garbage collected)
So basically you create a temporary file for each intermediate result and after all the calculations you close them and their content will be automatically removed.
Let's say your calculations go like this
import polars as pl
# minimal working example
initial = pl.DataFrame(data={"a": [1]}).lazy()
some_transformation = lambda x: x
another_transformation = lambda x: x
final_transformation = lambda x: x
initial = pl.scan_parquet("initial.parquet")
step_1 = some_transformation(initial)
step_2 = another_transformation(step_1)
final = final_transformation(step_2)
final.sink_parquet("final.parquet")
With intermediate results saving to temporary files, they will change to this:
import polars as pl
from tempfile import TemporaryFile as TF
initial = pl.scan_parquet("initial.parquet")
with TF() as f1, TF() as f2:
step_1 = some_transformation(initial)
step_1.sink_parquet(f1)
step_1 = pl.scan_parquet(f1)
step_2 = another_transformation(step_1)
step_2.sink_parquet(f2)
step_2 = pl.scan_parquet(f2)
final = final_transformation(step_2)
final.sink_parquet("./final.parquet")
# now f1 and f2 are closed and removed
pl.read_parquet("./final.parquet")