parquetpartitioningpartitionpyarrow

Repartition large parquet dataset by ranges of values


I have a large .parquet dataset splitted into ~256k chunks (20GB). Lately I've repacked it into 514 chunks (28GB) to reduce the number of files.

What I really need is to load data based on a field which contains int32 values in the range from 0 to 99.999.999 (around 200k different values).

I've tried an example Writing large amounts of data, but pyspark 5 doesn't allow to write so many partitions and raises error pyarrow.lib.ArrowInvalid: Fragment would be written into 203094 partitions. This exceeds the maximum of 1024

Is it somehow possible to repartition the dataset based on the mentioned field so that each chunk contains range of values? e.g. partition 1 (0-99999), partition 2 (100000-199000), ...


Solution

  • The max_partitions is configurable (pyarrow >= 4.0.0). You might start to run into ARROW-12321 because pyarrow is going to open a file descriptor for each partition and won't close it until its received all data. You could then bump the max file descriptors on your system to work around that.

    Your idea about grouping the partition column is a good one too. That should reduce the number of files you have (making things easier to manage) and may even improve performance (even though each file will have more data). Unfortunately, this isn't quite ready to be easily implemented. Arrow's projection mechanism is what you want but pyarrow's dataset expressions aren't fully hooked up to pyarrow compute functions (ARROW-12060).

    There is a slightly more verbose, but more flexible approach available. You can scan the batches in python, apply whatever transformation you want, and then expose that as an iterator of batches which the dataset writer will accept:

    import pyarrow as pa
    import pyarrow.dataset as ds
    import pyarrow.compute as pc
    import pyarrow.parquet as pq
    
    table = pa.Table.from_pydict({'x': range(20), 'y': [1] * 20})
    pq.write_table(table, '/tmp/foo.parquet')
    part = pa.dataset.partitioning(pa.schema([("partition_key", pa.int64())]), flavor='hive')
    dataset = pa.dataset.dataset('/tmp/foo.parquet')
    
    scanner = dataset.scanner()
    scanner_iter = scanner.scan_batches()
    
    # Arrow doesn't have modulo / integer division yet but we can
    # approximate it with masking (ARROW-12755).
    # There will be 2^3 items per group.  Adjust items_per_group_exponent
    # to your liking for more items per file.                                                                                                                                                                           
    items_per_group_exponent = 3
    items_per_group_mask = (2 ** items_per_group_exponent) - 1
    mask = ((2 ** 63) - 1) ^ items_per_group_mask
    def projector():
        while True:
            try:
                next_batch = next(scanner_iter).record_batch
                partition_key_arr = pc.bit_wise_and(next_batch.column('x'), mask)
                all_arrays = [*next_batch.columns, partition_key_arr]
                all_names = [*next_batch.schema.names, 'partition_key']
                batch_with_part = pa.RecordBatch.from_arrays(all_arrays, names=all_names)
                print(f'Yielding {batch_with_part}')
                yield batch_with_part
            except StopIteration:
                return
    
    full_schema = dataset.schema.append(pa.field('partition_key', pa.int64()))
    ds.write_dataset(projector(), '/tmp/new_dataset', schema=full_schema, format='parquet', partitioning=part)