I have a large sharded dataset stored in a custom format that would benefit greatly from dask.dataframe.from_delayed
However, I'm seeing odd behavior when trying to persist the resulting dataframe:
def load(file):
# Just an example...Actual loading code is more complex.
return pd.read_parquet(file)
filenames = ['my_file']
df = dd.from_delayed([delayed(load)(f) for f in filenames])
df = df.persist()
print(df.count().compute())
This results in two consecutive 'load' tasks with each task loading data from the network from scratch: once when calling .persist() and once when running computations on the persisted dataframe.
I would expect only one 'load' task, and then the computations would work on the persisted dataframe.
I verified that
df = dd.read_parquet('my_file')
df = df.persist()
print(df.count().compute())
correctly only schedules one read_parquet task so data is only loaded from the network once.
Is there a workaround for this issue to ensure that after calling .persist, data isn't re-downloaded from the network?
When you create a dask dataframe, dask needs to know the columns and types, so that it can infer further lazy operations you might do on it. In from_delayed
, you have an opportunity to provide this thing with the meta=
argument. If you don't provide it, dask will load the first partition to infer it, and then discard that data - which can be costly as you say.
So, the answer is: provide meta=
; either you can just know it, or else, find a way to load only a portion of the first partition to infer it at runtime.