daskcategorical-data

dask error when trying to modify categorical data


I want to transform one categorical column of my dask.DataFrame.

Precision: some categories may exist but not be present in my dataframe. It is important for me to keep them.

Here is what I am doing now:

# toy example
df = pd.DataFrame({'file': ['A.csv', 'B.csv', 'C.csv']})
df['file'] = df.file.astype('category').cat.add_categories(['D.csv'])
ddf = dd.from_pandas(df, npartitions=2)

# get new category
ddf["id"] = ddf["file"].cat.rename_categories(lambda x : x.split('.')[0])

When I inspect ddf["id"].cat.categories, the result is as expected:

Index(['A', 'B', 'C'], dtype='object')

However, ddf.compute() raises the following error:

TypeError: <lambda>() missing 1 required positional argument: 'x'

(edit) Solution:

It works using apply:

# get new category
ddf["id"] = ddf["file"].cat.rename_categories(lambda x : x.split('.')[0])

Solution

  • Try this code:

    import pandas as pd
    import dask.dataframe as dd
    
    df = pd.DataFrame({'file': ['A.csv', 'B.csv', 'C.csv']})
    df['file'] = df['file'].astype('category')
    ddf = dd.from_pandas(df, npartitions=2)
    
    # Define named function for transformation
    def extract_id_from_file(df):
        df['id'] = df['file'].str.split('.').str[0]
        return df
    
    # Apply transformation using map_partitions
    ddf = ddf.map_partitions(extract_id_from_file)
    
    # Inspect new categories
    print(ddf['id'].compute().unique())
    
    # Compute dask DF
    result = ddf.compute()
    print(result)
    

    Thing is, Dask cannot serialize lambda functions in distributed setup. A named function must be used instead of your lambda function.

    It was also important to transform the categorical column in a way that Dask can handle, by mapping the category transformation using Dask's "map_partitions".