I've got a parquet dataset located in the directory "dataset_path"
with an index column date
.
The metadata was created by dask and the relevant schema data looks as follows:
date: timestamp[us]
-- schema metadata --
pandas: '{"index_columns": ["date"], ...
I read the dataset into a dask dataframe using pyarrow:
import dask.dataframe as dd
ddf = dd.read_parquet("dataset_path", engine="pyarrow", calculate_divisions=True)
Just to be sure that the divisions are known to dask, I run:
print(ddf.divisions)
(Timestamp('2020-01-01 00:00:00'), Timestamp('2020-02-01 00:00:00'), ...
Nevertheless, dask seems to ignore the divisions in many operations. For example, running the following code
ddf.loc[ddf.index == pd.Timestamp("2020-01-01")].compute()
after enabling debugging with
import logging
logging.basicConfig(format="%(message)s", level=logging.DEBUG)
outputs to the log:
open file: ./dataset_path/part.0.parquet
open file: ./dataset_path/part.1.parquet
open file: ./dataset_path/part.2.parquet
...
So dask processes all files in the dataset. Even though from the divisions it's clear that only the first file can match.
What am I missing here? Any help how I can tell dask to look only at the relevant files is greatly appreciated.
The same problem arises with other index-based operations. For instance merging on the index with
pdf = pd.DataFrame({"y": 0}, index=pd.Index([pd.Timestamp("2020-01-01")]))
dd.merge(ddf, pdf, left_index=True, right_index=True).compute()
also opens all files in the dataset.
I also tried switching from pyarrow to fastparquet as an engine. But the problem persists.
Finally, here's some code to produce a dummy version of "my_dataset"
to get a minimal working example:
from datetime import date
from dateutil.relativedelta import relativedelta
import pandas as pd
def write_parquet_timeseries(path: str, start: date, end: date):
for part, pdf in enumerate(generate_timeseries(start, end)):
dd.to_parquet(
dd.from_pandas(pdf, npartitions=1),
path,
engine="pyarrow",
overwrite=part == 0,
append=part > 0,
write_metadata_file=True,
)
def generate_timeseries(start: date, end: date):
start0, end0 = None, start
while end0 < end:
start0, end0 = end0, end0 + relativedelta(months=1)
yield timeseries(start0, end0)
def timeseries(start: date, end: date, num_rows: int = 2**16, num_cols: int = 2**4):
index = pd.Index(pd.date_range(start, end, inclusive="left"), name="date").repeat(num_rows)
return pd.DataFrame({f"x{i}": range(i, len(index) + i) for i in range(num_cols)}, index=index)
write_parquet_timeseries("dataset_path", date(2020, 1, 1), date(2021, 1, 1))
Dask is only able to "push down" a limited subset of selection operations to the loader layer. For example, if you replace
ddf.loc[ddf.index == pd.Timestamp("2020-01-01")].compute()
with the syntactically simpler
ddf.loc[pd.Timestamp("2020-01-01")].compute()
you should get the load optimization you are after. This is because in the first case, dask doesn't know that you want to use ddf.index
for the purposes of selection only, and so it considers all of the values in the dataframe. Future higher-level optimizations might be able to spot and use such patterns.