streamparquetpython-polarspyarrowsink

Python Polars: Low memory read, process, writing of parquet to/from Hadoop


I would like to be able to process very large files in Polars without running out of memory. In the documentation they suggest using scanning, lazyframes and sinks, but it is hard to find proper documentation of how to do this in practice. Hopefully some experts on here can help.

Here I provide an example of what works for "smaller" files that can be handled in memory.

1. Setup

# Imports
import pandas as pd
import polars as pl
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow._hdfs import HadoopFileSystem

# Setting up HDFS file system
hdfs_filesystem = HDFSConnection('default')
hdfs_out_path_1 = "scanexample.parquet"
hdfs_out_path_2 = "scanexample2.parquet"

2. Creating data

# Dataset
df = pd.DataFrame({
    'A': np.arange(10000),
    'B': np.arange(10000),
    'C': np.arange(10000),
    'D': np.arange(10000),
})

# Writing to Hadoop
pq_table = pa.Table.from_pandas(df)
pq_writer = pq.ParquetWriter(hdfs_out_path_1, 
                             schema=pq_table.schema, 
                             filesystem=hdfs_filesystem)

# Appending to parquet file
pq_writer.write_table(pq_table)
pq_writer.close()

3. Reading parquet into polars dataframe (in memory)

# Read file
pq_df = pl.read_parquet(source=hdfs_out_path_1, 
                        use_pyarrow=True, 
                        pyarrow_options={"filesystem": hdfs_filesystem})

4. Making transforms and writing to file

 # Transforms and write
 pq_df.filter(pl.col('A')>9000)\
      .write_parquet(file = hdfs_out_path_2, use_pyarrow=True, pyarrow_options={"filesystem": hdfs_filesystem})

5. Now doing the same with low memory

# Scanning file: Attempt 1
scan_df = pl.scan_parquet(source = hdfs_out_path_2)
ERROR: Cannot find file

# Scanning file: Attempt 2
scan_df = pl.scan_parquet(source = hdfs_filesystem.open_input_stream(hdfs_out_path_1))
ERROR: expected str, bytes or os.PathLike object, not pyarrow.lib.NativeFile

According to the polars documentation the scan_parquet does not take pyarrow arguments. But it talks about taking some "storage options", which I guess is what I need to use. But how?

6. Example without Hadoop

# Writing to parquet
df.to_parquet(path="testlocal.parquet")

# Read lazily 
lazy_df = pl.scan_parquet(source="testlocal.parquet")

# Transforms and write
lazy_df.filter(pl.col('A')>9000).sink_parquet(path= "testlocal.out.parquet")

UPDATE!

While the accepted answer lets you load your data into a LazyFrame that lazyframe comes with limited functionality, as it cannot sink that data to a file without first collecting it all to the memory!

# Reading into LazyFrame
import pyarrow.dataset as ds
pq_lf = pl.scan_pyarrow_dataset(
          ds.dataset(hdfs_out_path_1, 
                     filesystem= hdfs_filesystem))

# Attempt at sinking to parquet
pq_lf.filter(pl.col('A')>9000).sink_parquet(path= "testlocal.out.parquet")
PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'

Solution

  • Bad news first, there's not a way to sink to a filesystem directly. You'd have to sink to local storage and then copy if you want to use sink_parquet. It looks like that will change soon with this PR but that isn't in the current release.

    For scanning (ie read-only) you have two options.

    First option: Use scan_pyarrow_dataset

    That might look like this

    import pyarrow.dataset as ds
    pq_lf = pl.scan_pyarrow_dataset(
              ds.dataset(hdfs_out_path_1, 
                            filesystem= hdfs_filesystem
              )
            )
    

    and now you have a lazy frame. That's probably the best way as you're already using the pyarrow.fs which seems to be independent of fsspec which is how polars accesses cloud files.

    Second Option: use scan_parquet's storage_options uses fsspec.open under the hood. That usage might look like this

    pq_lf=pl.scan_parquet(f"hdfs://{hdfs_out_path_1}, storage_options=some_dict)
    

    What your storage_options look like relative to what you do currently in hdfs_filesystem = HDFSConnection('default') is going to be determined by your HDFS provider and/or how fsspec interacts with HDFS systems so if it doesn't just work you'd have to figure that out separately.

    Quasi sinking workaround

    You can do something like this

    schema=lazy_df.fetch(1).to_arrow().schema
    with pq.ParquetWriter(hdfs_out_path_1, 
                                 schema=schema, 
                                 filesystem=hdfs_filesystem) as pq_writer:
        for A_chunk in [(9000, 12000), (12000, 15000), (15000,20000)]:
            pq_writer.write_table(
                lazy_df
                    .filter(pl.col('A').is_between(A_chunk))
                     # I'd also put a sort here
                    .collect()
                    .to_arrow()
                    )
    

    Pyarrow "sinking" (no polars)

    polars is primarily focused on being an in-memory dataframe library. It has some support for doing some things out of memory but it can't do everything. Apparently, sink_parquet is not yet capable of streaming results from upstream filesystems. pyarrow has more streaming support even with remote non-local filesystems. For instance you can do this...

    pa_scanner = ds.dataset(hdfs_out_path_1, filesystem= hdfs_filesystem) \
            .scanner(filter=ds.field('A')>9000)
    ds.write_dataset(pa_scanner, "path_to_directory_not_a_file", format="parquet")
    

    The path needs to be a directory and then it will create files in the directory named as "part-{x}.parquet". Whether or not it'll create a single file or multiple is dependent on your data. See this for more info and tweaks. The good news about the above is that you can pass filesystem=hdfs_filesystem to ds.write_dataset and avoid touching the local filesystem. Of course then you're limited to the operations that pyarrow can perform so you've just got to pick your poison.

    Side notes:

    1. Statistics: When polars writes parquet files, even through pyarrow, it does so with statistics off which will preclude future read optimizations with row groups so you need to explicitly specify statistics=True if you want statistics.

    2. Compression: pyarrow uses snappy compression by default but polars uses zstd. I think, at this point, pyarrow's default is just one of those things that has been the default for so long that they can't change it. I say that because zstd seems to be supported by everything I've tried to use and produces smaller files than snappy without really sacrificing performance so if you're using a pyarrow writer then I'd set compression='zstd'.

    3. Row Groups: When you use the ParquetWriter like above, then each call to write_table will produce a distinct row_group in the resulting file so you can really optimize future reads by having a chunking strategy here that aligns with how you'll read the files later (so long as you turn on statistics).