pythonpandasdaskdask-distributed

How to persist a dask dataframe loaded with dask.dataframe.from_delayed


I have a large sharded dataset stored in a custom format that would benefit greatly from dask.dataframe.from_delayed

However, I'm seeing odd behavior when trying to persist the resulting dataframe:

def load(file):
  # Just an example...Actual loading code is more complex.
  return pd.read_parquet(file)

filenames = ['my_file']
df = dd.from_delayed([delayed(load)(f) for f in filenames])
df = df.persist()
print(df.count().compute())

This results in two consecutive 'load' tasks with each task loading data from the network from scratch: once when calling .persist() and once when running computations on the persisted dataframe.

I would expect only one 'load' task, and then the computations would work on the persisted dataframe.

I verified that

df = dd.read_parquet('my_file')
df = df.persist()
print(df.count().compute())

correctly only schedules one read_parquet task so data is only loaded from the network once.

Is there a workaround for this issue to ensure that after calling .persist, data isn't re-downloaded from the network?


Solution

  • When you create a dask dataframe, dask needs to know the columns and types, so that it can infer further lazy operations you might do on it. In from_delayed, you have an opportunity to provide this thing with the meta= argument. If you don't provide it, dask will load the first partition to infer it, and then discard that data - which can be costly as you say.

    So, the answer is: provide meta=; either you can just know it, or else, find a way to load only a portion of the first partition to infer it at runtime.