I have a large .parquet dataset splitted into ~256k chunks (20GB). Lately I've repacked it into 514 chunks (28GB) to reduce the number of files.
What I really need is to load data based on a field which contains int32
values in the range from 0 to 99.999.999 (around 200k different values).
I've tried an example Writing large amounts of data, but pyspark 5 doesn't allow to write so many partitions and raises error pyarrow.lib.ArrowInvalid: Fragment would be written into 203094 partitions. This exceeds the maximum of 1024
Is it somehow possible to repartition the dataset based on the mentioned field so that each chunk contains range of values? e.g. partition 1 (0-99999), partition 2 (100000-199000), ...
The max_partitions
is configurable (pyarrow >= 4.0.0
). You might start to run into ARROW-12321 because pyarrow is going to open a file descriptor for each partition and won't close it until its received all data. You could then bump the max file descriptors on your system to work around that.
Your idea about grouping the partition column is a good one too. That should reduce the number of files you have (making things easier to manage) and may even improve performance (even though each file will have more data). Unfortunately, this isn't quite ready to be easily implemented. Arrow's projection mechanism is what you want but pyarrow's dataset expressions aren't fully hooked up to pyarrow compute functions (ARROW-12060).
There is a slightly more verbose, but more flexible approach available. You can scan the batches in python, apply whatever transformation you want, and then expose that as an iterator of batches which the dataset writer will accept:
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.compute as pc
import pyarrow.parquet as pq
table = pa.Table.from_pydict({'x': range(20), 'y': [1] * 20})
pq.write_table(table, '/tmp/foo.parquet')
part = pa.dataset.partitioning(pa.schema([("partition_key", pa.int64())]), flavor='hive')
dataset = pa.dataset.dataset('/tmp/foo.parquet')
scanner = dataset.scanner()
scanner_iter = scanner.scan_batches()
# Arrow doesn't have modulo / integer division yet but we can
# approximate it with masking (ARROW-12755).
# There will be 2^3 items per group. Adjust items_per_group_exponent
# to your liking for more items per file.
items_per_group_exponent = 3
items_per_group_mask = (2 ** items_per_group_exponent) - 1
mask = ((2 ** 63) - 1) ^ items_per_group_mask
def projector():
while True:
try:
next_batch = next(scanner_iter).record_batch
partition_key_arr = pc.bit_wise_and(next_batch.column('x'), mask)
all_arrays = [*next_batch.columns, partition_key_arr]
all_names = [*next_batch.schema.names, 'partition_key']
batch_with_part = pa.RecordBatch.from_arrays(all_arrays, names=all_names)
print(f'Yielding {batch_with_part}')
yield batch_with_part
except StopIteration:
return
full_schema = dataset.schema.append(pa.field('partition_key', pa.int64()))
ds.write_dataset(projector(), '/tmp/new_dataset', schema=full_schema, format='parquet', partitioning=part)