pythonpython-polarspolars

Sink/scan parquet temporarily under the new Streaming engine


I am currently working with larger-than-ram data. Therefore, I am relying on the Streaming engine to run all operations. The problem is that, given the complexity of the process, my kernel crashes (silently).

To address this, I collect some intermediate steps, to convert it right after to LazyFrame. I have found that for my processes, a better approach would be to sink_parquet and then scan_parquet, as in this way I avoid writing the intermediate DataFrame to disk after collecting -and before turning it a lazyframe-.

The problem I face is that this process "pollutes" my storage system. Therefore I am wondering if there is any way to sink-scan a temporary parquet file, avoiding writing any intermediate heavy dataframe.


Solution

  • You can use a builtin tempfile library.

    tempfile.TemporaryFile(mode='w+b', buffering=-1, encoding=None, newline=None, suffix=None, prefix=None, dir=None, *, errors=None)

    Return a file-like object that can be used as a temporary storage area. The file is created securely, using the same rules as mkstemp(). It will be destroyed as soon as it is closed (including an implicit close when the object is garbage collected)

    So basically you create a temporary file for each intermediate result and after all the calculations you close them and their content will be automatically removed.

    Let's say your calculations go like this

    import polars as pl
    
    # minimal working example
    initial = pl.DataFrame(data={"a": [1]}).lazy()
    some_transformation = lambda x: x
    another_transformation = lambda x: x
    final_transformation = lambda x: x
    
    
    initial = pl.scan_parquet("initial.parquet")
    step_1 = some_transformation(initial)
    step_2 = another_transformation(step_1) 
    final = final_transformation(step_2)
    final.sink_parquet("final.parquet")
    

    With intermediate results saving to temporary files, they will change to this:

    import polars as pl
    from tempfile import TemporaryFile as TF
    
    
    initial = pl.scan_parquet("initial.parquet")
    
    with TF() as f1, TF() as f2:
        step_1 = some_transformation(initial)
        step_1.sink_parquet(f1)
        step_1 = pl.scan_parquet(f1)
    
        step_2 = another_transformation(step_1) 
        step_2.sink_parquet(f2)
        step_2 = pl.scan_parquet(f2)
        
        final = final_transformation(step_2)
        final.sink_parquet("./final.parquet")
    # now f1 and f2 are closed and removed
    pl.read_parquet("./final.parquet")