My overall goal is to read several csv files, do some computation, save them as a parquet database using the partition_on option in the to_parquet function.
I cannot reindex and repartition before saving because of limited memory. When saving, each file will end up being a different partition, and thus a different parquet file. I cannot use the default file name part.0.parquet because I might need to add files to the same directory in the future and they might be as well a part.0.parquet.
I thus want to assign to each parquet file the name of the original csv file it comes from.
To do this, when I first read the csv file I add a column with the file name (--> all rows in each partition will have the same file name). Then I read the first row of each partition (and specifically the column with the original csv file name) and create a list of file names. Then I use the option name_function in the to_parquet function.
I achieve what I wanted, but in this way I have to call a .compute() and this takes very long.
Do you have any idea how I can limit the computation to the first row of each partition?
This is my current code:
def get_first_element(partition):
return partition['orig_file_name'].iloc[0]
first_elements = ddf.map_partitions(get_first_element).compute()
def name_function(part_idx):
return f"{first_elements[part_idx]}.parquet"
ddf.to_parquet(path=target_directory,
engine='pyarrow',
partition_on=['date', 'hour'],
name_function=name_function,
write_index=True)
Thank you very much in advance for any suggestion!
Edit This code replicates my problem following @mdurant suggestion:
@dask.delayed
def process(file_path):
df = pd.DataFrame({'col1':[0, 1, 2, 3], 'col2':[4, 5, 6, 7], 'col3':[88, 88, 99, 99]}) # this is read_csv in my code
file_name = 'aaa'
df.to_parquet(f'{file_name}.parquet',
partition_cols=['col3'])
dask.compute(*[process(f) for f in [1]])
If each input partition is being outputted separately (which is what I guess from your description), then you are not really using dask.dataframe's API - you could just use delayed instead.
@dask.delayed
def process(filename):
df = pd.read_csv(filename, ...)
ddf = dd.from_pandas(df, npartition=1)
# processing
def name(part_idx):
f"{filename}.parquet"
out = ddf.to_parquet(f"out.parquet", partition_cols=["date", "hour"], name_function=name, append=True, compute=False)
dask.compute(out, scheduler="sync")
dask.compute(*[process(f) for f in filenames])