pandasdaskparquetfastparquetdask-dataframe

Dask not recovering partitions from simple (non-Hive) Parquet files


I have a two-part question about Dask+Parquet. I am trying to run queries on a dask dataframe created from a partitioned Parquet file as so:

import pandas as pd
import dask.dataframe as dd
import fastparquet

##### Generate random data to Simulate Process creating a Parquet file ######

test_df = pd.DataFrame(data=np.random.randn(10000, 2), columns=['data1', 'data2'])
test_df['time'] = pd.bdate_range('1/1/2000', periods=test_df.shape[0], freq='1S')

# some grouping column
test_df['name'] = np.random.choice(['jim', 'bob', 'jamie'], test_df.shape[0])


##### Write to partitioned parquet file, hive and simple #####

fastparquet.write('test_simple.parquet', data=test_df, partition_on=['name'], file_scheme='simple')
fastparquet.write('test_hive.parquet',   data=test_df, partition_on=['name'], file_scheme='hive')

# now check partition sizes. Only Hive version works.
assert test_df.name.nunique() == dd.read_parquet('test_hive.parquet').npartitions  # works.
assert test_df.name.nunique() == dd.read_parquet('test_simple.parquet').npartitions # !!!!FAILS!!!

My goal here is to be able to quickly filter and process individual partitions in parallel using dask, something like this:

df = dd.read_parquet('test_hive.parquet')
df.map_partitions(<something>)   # operate on each partition

I'm fine with using the Hive-style Parquet directory, but I've noticed it takes significantly longer to operate on compared to directly reading from a single parquet file.

Can someone tell me the idiomatic way to achieve this? Still fairly new to Dask/Parquet so apologies if this is a confused approach.


Solution

  • Maybe it wasn't clear from the docstring, but partitioning by value simply doesn't happen for the "simple" file type, which is why it only has one partition.

    As for speed, reading the data in one single function call is fastest when the data are so small - especially if you intend to do any operation such as nunique which will require a combination of values from different partitions.

    In Dask, every task incurs an overhead, so unless the amount of work being done by the call is large compared to that overhead, you can lose out. In addition, disk access is not generally parallelisable, and some parts of the computation may not be able to run in parallel in threads if they hold the GIL. Finally, the partitioned version contains more parquet metadata to be parsed.

    >>> len(dd.read_parquet('test_hive.parquet').name.nunique())
    12
    >>> len(dd.read_parquet('test_simple.parquet').name.nunique())
    6
    

    TL;DR: make sure your partitions are big enough to keep dask busy.

    (note: the set of unique values is already apparent from the parquet metadata, it shouldn't be necessary to load the data at all; but Dask doesn't know how to do this optimisation since, after all, some of the partitions may contain zero rows)