pythondaskdask-distributeddask-dataframedask-delayed

Create new column in Dask DataFrame with specific value for each partition


I have two Dask DataFrames with the same number of partitions.

The first one has few columns and few rows for each partition (so Pandas DataFrame), but the number of rows could differs between two partitions (not the columns). The second Dask DataFrame has exactly the same number of partitions but have only 1 value. I'm trying to merge these 2 Dask DataFrames by merging, partition to partition like below.

Dask DataFrame N°1 (toy example):

enter image description here

Dask DataFrame N°2 (with only 1 value per partition and so only 1 column):

enter image description here

And my aim is to obtain a Dask Dataframe like this :

enter image description here

I tried Dask.assign() with Dask.map_partitions() but without success.

Do you have an idea to make this efficently ?

Regards.

rmarion37


Solution

  • Thanks for your answer,

    To fulfill all the rows for each partition the key point, in the delayed function, is to force the value as a integer (for my example). It's for sure a special tweak from Pandas, not very clear in the docuementation but real !!!

    @dask.delayed def process_partitions(partition1, partition2):  
        partition1.loc[:, 'id_cycle'] = int(partition2)
        return partition1
    
    delayeds = []
    for partition1, partition2 in zip(ddf1.partitions, ddf2.partitions):
        delayeds.append(process_partitions(partition1, partition2))
    
    ddf3 = from_delayed(delayeds)
    ddf = ddf3.compute()