I want to transform one categorical column of my dask.DataFrame.
Precision: some categories may exist but not be present in my dataframe. It is important for me to keep them.
Here is what I am doing now:
# toy example
df = pd.DataFrame({'file': ['A.csv', 'B.csv', 'C.csv']})
df['file'] = df.file.astype('category').cat.add_categories(['D.csv'])
ddf = dd.from_pandas(df, npartitions=2)
# get new category
ddf["id"] = ddf["file"].cat.rename_categories(lambda x : x.split('.')[0])
When I inspect ddf["id"].cat.categories
, the result is as expected:
Index(['A', 'B', 'C'], dtype='object')
However, ddf.compute()
raises the following error:
TypeError: <lambda>() missing 1 required positional argument: 'x'
It works using apply
:
# get new category
ddf["id"] = ddf["file"].cat.rename_categories(lambda x : x.split('.')[0])
Try this code:
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({'file': ['A.csv', 'B.csv', 'C.csv']})
df['file'] = df['file'].astype('category')
ddf = dd.from_pandas(df, npartitions=2)
# Define named function for transformation
def extract_id_from_file(df):
df['id'] = df['file'].str.split('.').str[0]
return df
# Apply transformation using map_partitions
ddf = ddf.map_partitions(extract_id_from_file)
# Inspect new categories
print(ddf['id'].compute().unique())
# Compute dask DF
result = ddf.compute()
print(result)
Thing is, Dask cannot serialize lambda functions in distributed setup. A named function must be used instead of your lambda function.
It was also important to transform the categorical column in a way that Dask can handle, by mapping the category transformation using Dask's "map_partitions".