I have very CPU heavy process and would like to use as many workers are possible in Dask.
When I read the csv file using the read_csv
from dask
and then process the dataframe using map_partitions
only one worker is used. If I use read_csv
from pandas
and then convert the file to a Dask dataframe, all my workers are used. See code below.
Could someone explain the difference in behavior?
Ideally, I would like to use read_csv
from Dask
so that I dont have to have a conversion step. Could anyone help me with that?
import dask as d
import pandas as pd
def fWrapper(x):
p = doSomething(x.ADDRESS, param)
return(pd.DataFrame(p, columns=["ADDRESS", "DATA","TOKEN", "CLASS"]))
# only use 1 worker instead of the available 8
dask_df = d.dataframe('path\to\file')
dask_df.set_index(UID, npartitions = 8, drop = False)
ddf2 = dask_df.map_partitions(fWrapper, meta={"ADDRESS" : object, "DATA" : object, "TOKEN" : object, "CLASS" : object}).compute()
#uses all 8 workers
df = pd.read_csv('path\to\file')
df.set_index('UID', drop=False)
dask_df2 =d.dataframe.from_pandas(df, npartitions=dask_params['df_npartitions'], sort=True)
ddf3 = dask_df2.map_partitions(fWrapper, meta={"ADDRESS" : object, "DATA" : object, "TOKEN" : object, "CLASS" : object}).compute()
The DataFrame.set_index
method in both dask.dataframe
and pandas
returns the updated dataframe, so it must be assigned to a label. pandas
does have a convenience kwarg inplace
, but that's not available in dask
. This means that in your snippet, the first approach should look like this:
dask_df = dask_df.set_index(UID, npartitions = 8, drop = False)
This will make sure that the new indexed dask dataframe has 8 partitions, so downstream work should be allocated across multiple workers.