pandasdataframedaskaggregation

Problems refactoring pandas.DataFrame.groupby.aggregate to dask.dataframe.groupby.aggregate with custom aggregation


I would like to run groupby and aggregation over a dataframe where the aggregation joins strings with the same id. The df looks like this:

In [1]: df = pd.DataFrame.from_dict({'id':[1,1,2,2,2,3], 'name':['a','b','c','d','e','f']})
In [2]: df
Out[2]:
   id name
0   1    a
1   1    b
2   2    c
3   2    d
4   2    e
5   3    f

I have this working in Pandas thus:

def list_aggregator(x):
    return '|'.join(x)

df2 = pd.DataFrame.from_dict('id':[], 'name':[])
df2['id'] = df['id'].drop_duplicates()
df2['name'] = df['name'].groupby(df['id']).agg(list_aggregator).values

Produces:

In [26]: df2
Out[26]:
   id   name
0   1    a|b
2   2  c|d|e
5   3      f

For Dask, my understanding (from the docs) is you have to tell Dask what to do to aggregate within chunks, and then what to do with those aggregated chunks. In both cases, I want to do the equivalent of '|'.join(). So:

ddf = dd.from_pandas(df, 2)
ddf2 = dd.from_pandas(pd.DataFrame.from_dict({'id':[],'name':[]}))
ddf2['id'] = ddf['id'].drop_duplicates()

dd_list_aggregation = dd.Aggregation(
    'list_aggregation',
    list_aggregator,  # chunks are aggregated into strings with 1 string per chunk
    list_aggregator,  # per-chunk strings are aggregated into a single string per id
)

ddf2['name'] = ddf['name'].groupby(ddf['id']).agg(dd_list_aggregation).values

Expected result is as above (or, indeed, nothing as I haven't called ddf2.compute() yet), but I receive this error:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_core.py:446, in Expr.__getattr__(self, key)
    445 try:
--> 446     return object.__getattribute__(self, key)
    447 except AttributeError as err:

File ~/miniconda3/envs/test/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
    980 if val is _NOT_FOUND:
--> 981     val = self.func(instance)
    982     try:

File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:206, in GroupByApplyConcatApply._meta_chunk(self)
    205 meta = meta_nonempty(self.frame._meta)
--> 206 return self.chunk(meta, *self._by_meta, **self.chunk_kwargs)

File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask/dataframe/groupby.py:1200, in _groupby_apply_funcs(df, *by, **kwargs)
   1199 for result_column, func, func_kwargs in funcs:
-> 1200     r = func(grouped, **func_kwargs)
   1202     if isinstance(r, tuple):

File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask/dataframe/groupby.py:1276, in _apply_func_to_column(df_like, column, func)
   1275 if column is None:
-> 1276     return func(df_like)
   1278 return func(df_like[column])

Cell In[88], line 2
      1 def dd_list_aggregator(x):
----> 2     return '|'.join(x[1])

File ~/miniconda3/envs/test/lib/python3.10/site-packages/pandas/core/base.py:245, in SelectionMixin.__getitem__(self, key)
    244     raise KeyError(f"Column not found: {key}")
--> 245 ndim = self.obj[key].ndim
    246 return self._gotitem(key, ndim=ndim)

AttributeError: 'str' object has no attribute 'ndim'

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
Cell In[96], line 1
----> 1 ddf2['name'] = ddf['name'].groupby(ddf['id']).agg(dd_list_aggregation).values

File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:1907, in GroupBy.agg(self, *args, **kwargs)
   1906 def agg(self, *args, **kwargs):
-> 1907     return self.aggregate(*args, **kwargs)

File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:1891, in GroupBy.aggregate(self, arg, split_every, split_out, shuffle_method, **kwargs)
   1888 if arg == "size":
   1889     return self.size()
-> 1891 return new_collection(
   1892     GroupbyAggregation(
   1893         self.obj.expr,
   1894         arg,
   1895         self.observed,
   1896         self.dropna,
   1897         split_every,
   1898         split_out,
   1899         self.sort,
   1900         shuffle_method,
   1901         self._slice,
   1902         *self.by,
   1903     )
   1904 )

File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_collection.py:4440, in new_collection(expr)
   4438 def new_collection(expr):
   4439     """Create new collection from an expr"""
-> 4440     meta = expr._meta
   4441     expr._name  # Ensure backend is imported
   4442     return get_collection_type(meta)(expr)

File ~/miniconda3/envs/test/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
    979 val = cache.get(self.attrname, _NOT_FOUND)
    980 if val is _NOT_FOUND:
--> 981     val = self.func(instance)
    982     try:
    983         cache[self.attrname] = val

File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:432, in GroupbyAggregation._meta(self)
    430 @functools.cached_property
    431 def _meta(self):
--> 432     return self._lower()._meta

File ~/miniconda3/envs/test/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
    979 val = cache.get(self.attrname, _NOT_FOUND)
    980 if val is _NOT_FOUND:
--> 981     val = self.func(instance)
    982     try:
    983         cache[self.attrname] = val

File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_reductions.py:425, in ApplyConcatApply._meta(self)
    423 @functools.cached_property
    424 def _meta(self):
--> 425     meta = self._meta_chunk
    426     aggregate = self.aggregate or (lambda x: x)
    427     if self.combine:

File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_core.py:451, in Expr.__getattr__(self, key)
    447 except AttributeError as err:
    448     if key.startswith("_meta"):
    449         # Avoid a recursive loop if/when `self._meta*`
    450         # produces an `AttributeError`
--> 451         raise RuntimeError(
    452             f"Failed to generate metadata for {self}. "
    453             "This operation may not be supported by the current backend."
    454         )
    456     # Allow operands to be accessed as attributes
    457     # as long as the keys are not already reserved
    458     # by existing methods/properties
    459     _parameters = type(self)._parameters

RuntimeError: Failed to generate metadata for DecomposableGroupbyAggregation(frame=df['name'], arg=<dask.dataframe.groupby.Aggregation object at 0x7f052960b850>, observed=False, split_out=1). This operation may not be supported by the current backend.

My thinking is numerical objects are expected, but the backend is pandas, so string manipulations should be possible, right?


Solution

  • There are several things that probably won't work in you example:

    Custom Dask aggregations are really hard to get right, however, in your case, I found a simpler solution:

    import pandas as pd
    import dask.dataframe as dd
    df = pd.DataFrame.from_dict({'id':[1,1,2,2,2,4], 'name':['a','b','c','d','e','f']})
    ddf = dd.from_pandas(df, 2)
    ddf2 = ddf.groupby('id').agg(list)
    ddf2["name2"] = ddf2["name"].apply(lambda l: "|".join(l))
    ddf2.compute()
    

    Which produces:

        name    name2
    id      
    1   [a, b]  a|b
    2   [c, d, e]   c|d|e
    4   [f] f