daskparquetpyarrowdask-dataframefastparquet

Dask ignores knowledge about divisions for parquet dataset


I've got a parquet dataset located in the directory "dataset_path" with an index column date.

The metadata was created by dask and the relevant schema data looks as follows:

date: timestamp[us]
-- schema metadata --
pandas: '{"index_columns": ["date"], ...

I read the dataset into a dask dataframe using pyarrow:

import dask.dataframe as dd

ddf = dd.read_parquet("dataset_path", engine="pyarrow", calculate_divisions=True)

Just to be sure that the divisions are known to dask, I run:

print(ddf.divisions)
(Timestamp('2020-01-01 00:00:00'), Timestamp('2020-02-01 00:00:00'), ...

Nevertheless, dask seems to ignore the divisions in many operations. For example, running the following code

ddf.loc[ddf.index == pd.Timestamp("2020-01-01")].compute()

after enabling debugging with

import logging

logging.basicConfig(format="%(message)s", level=logging.DEBUG)

outputs to the log:

open file: ./dataset_path/part.0.parquet
open file: ./dataset_path/part.1.parquet
open file: ./dataset_path/part.2.parquet
...

So dask processes all files in the dataset. Even though from the divisions it's clear that only the first file can match.

What am I missing here? Any help how I can tell dask to look only at the relevant files is greatly appreciated.

The same problem arises with other index-based operations. For instance merging on the index with

pdf = pd.DataFrame({"y": 0}, index=pd.Index([pd.Timestamp("2020-01-01")]))
dd.merge(ddf, pdf, left_index=True, right_index=True).compute()

also opens all files in the dataset.

I also tried switching from pyarrow to fastparquet as an engine. But the problem persists.

Finally, here's some code to produce a dummy version of "my_dataset" to get a minimal working example:

from datetime import date
from dateutil.relativedelta import relativedelta
import pandas as pd

def write_parquet_timeseries(path: str, start: date, end: date):
    for part, pdf in enumerate(generate_timeseries(start, end)):
        dd.to_parquet(
            dd.from_pandas(pdf, npartitions=1),
            path,
            engine="pyarrow",
            overwrite=part == 0,
            append=part > 0,
            write_metadata_file=True,
        )

def generate_timeseries(start: date, end: date):
    start0, end0 = None, start
    while end0 < end:
        start0, end0 = end0, end0 + relativedelta(months=1)
        yield timeseries(start0, end0)

def timeseries(start: date, end: date, num_rows: int = 2**16, num_cols: int = 2**4):
    index = pd.Index(pd.date_range(start, end, inclusive="left"), name="date").repeat(num_rows)
    return pd.DataFrame({f"x{i}": range(i, len(index) + i) for i in range(num_cols)}, index=index)

write_parquet_timeseries("dataset_path", date(2020, 1, 1), date(2021, 1, 1))

Solution

  • Dask is only able to "push down" a limited subset of selection operations to the loader layer. For example, if you replace

    ddf.loc[ddf.index == pd.Timestamp("2020-01-01")].compute()
    

    with the syntactically simpler

    ddf.loc[pd.Timestamp("2020-01-01")].compute()
    

    you should get the load optimization you are after. This is because in the first case, dask doesn't know that you want to use ddf.index for the purposes of selection only, and so it considers all of the values in the dataframe. Future higher-level optimizations might be able to spot and use such patterns.