pythondaskdistributeddask-distributeddask-dataframe

Dask map_partition does no use all workers on client


I have very CPU heavy process and would like to use as many workers are possible in Dask.

When I read the csv file using the read_csv from dask and then process the dataframe using map_partitions only one worker is used. If I use read_csv from pandas and then convert the file to a Dask dataframe, all my workers are used. See code below.

Could someone explain the difference in behavior?

Ideally, I would like to use read_csv from Dask so that I dont have to have a conversion step. Could anyone help me with that?

import dask as d
import pandas as pd

def fWrapper(x):
            p = doSomething(x.ADDRESS, param)
            return(pd.DataFrame(p, columns=["ADDRESS", "DATA","TOKEN", "CLASS"]))

# only use 1 worker instead of the available 8
dask_df = d.dataframe('path\to\file')
dask_df.set_index(UID, npartitions = 8,   drop = False)
ddf2 = dask_df.map_partitions(fWrapper, meta={"ADDRESS" : object, "DATA" : object, "TOKEN" : object, "CLASS" : object}).compute() 

#uses all 8 workers
df = pd.read_csv('path\to\file')
df.set_index('UID', drop=False)
dask_df2 =d.dataframe.from_pandas(df, npartitions=dask_params['df_npartitions'], sort=True)
ddf3 = dask_df2.map_partitions(fWrapper, meta={"ADDRESS" : object, "DATA" : object, "TOKEN" : object, "CLASS" : object}).compute() 

Solution

  • The DataFrame.set_index method in both dask.dataframe and pandas returns the updated dataframe, so it must be assigned to a label. pandas does have a convenience kwarg inplace, but that's not available in dask. This means that in your snippet, the first approach should look like this:

    dask_df = dask_df.set_index(UID, npartitions = 8,   drop = False)
    

    This will make sure that the new indexed dask dataframe has 8 partitions, so downstream work should be allocated across multiple workers.