I would like to run groupby and aggregation over a dataframe where the aggregation joins strings with the same id
.
The df looks like this:
In [1]: df = pd.DataFrame.from_dict({'id':[1,1,2,2,2,3], 'name':['a','b','c','d','e','f']})
In [2]: df
Out[2]:
id name
0 1 a
1 1 b
2 2 c
3 2 d
4 2 e
5 3 f
I have this working in Pandas thus:
def list_aggregator(x):
return '|'.join(x)
df2 = pd.DataFrame.from_dict('id':[], 'name':[])
df2['id'] = df['id'].drop_duplicates()
df2['name'] = df['name'].groupby(df['id']).agg(list_aggregator).values
Produces:
In [26]: df2
Out[26]:
id name
0 1 a|b
2 2 c|d|e
5 3 f
For Dask, my understanding (from the docs) is you have to tell Dask what to do to aggregate within chunks, and then what to do with those aggregated chunks. In both cases, I want to do the equivalent of '|'.join()
. So:
ddf = dd.from_pandas(df, 2)
ddf2 = dd.from_pandas(pd.DataFrame.from_dict({'id':[],'name':[]}))
ddf2['id'] = ddf['id'].drop_duplicates()
dd_list_aggregation = dd.Aggregation(
'list_aggregation',
list_aggregator, # chunks are aggregated into strings with 1 string per chunk
list_aggregator, # per-chunk strings are aggregated into a single string per id
)
ddf2['name'] = ddf['name'].groupby(ddf['id']).agg(dd_list_aggregation).values
Expected result is as above (or, indeed, nothing as I haven't called ddf2.compute()
yet), but I receive this error:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_core.py:446, in Expr.__getattr__(self, key)
445 try:
--> 446 return object.__getattribute__(self, key)
447 except AttributeError as err:
File ~/miniconda3/envs/test/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
980 if val is _NOT_FOUND:
--> 981 val = self.func(instance)
982 try:
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:206, in GroupByApplyConcatApply._meta_chunk(self)
205 meta = meta_nonempty(self.frame._meta)
--> 206 return self.chunk(meta, *self._by_meta, **self.chunk_kwargs)
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask/dataframe/groupby.py:1200, in _groupby_apply_funcs(df, *by, **kwargs)
1199 for result_column, func, func_kwargs in funcs:
-> 1200 r = func(grouped, **func_kwargs)
1202 if isinstance(r, tuple):
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask/dataframe/groupby.py:1276, in _apply_func_to_column(df_like, column, func)
1275 if column is None:
-> 1276 return func(df_like)
1278 return func(df_like[column])
Cell In[88], line 2
1 def dd_list_aggregator(x):
----> 2 return '|'.join(x[1])
File ~/miniconda3/envs/test/lib/python3.10/site-packages/pandas/core/base.py:245, in SelectionMixin.__getitem__(self, key)
244 raise KeyError(f"Column not found: {key}")
--> 245 ndim = self.obj[key].ndim
246 return self._gotitem(key, ndim=ndim)
AttributeError: 'str' object has no attribute 'ndim'
During handling of the above exception, another exception occurred:
RuntimeError Traceback (most recent call last)
Cell In[96], line 1
----> 1 ddf2['name'] = ddf['name'].groupby(ddf['id']).agg(dd_list_aggregation).values
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:1907, in GroupBy.agg(self, *args, **kwargs)
1906 def agg(self, *args, **kwargs):
-> 1907 return self.aggregate(*args, **kwargs)
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:1891, in GroupBy.aggregate(self, arg, split_every, split_out, shuffle_method, **kwargs)
1888 if arg == "size":
1889 return self.size()
-> 1891 return new_collection(
1892 GroupbyAggregation(
1893 self.obj.expr,
1894 arg,
1895 self.observed,
1896 self.dropna,
1897 split_every,
1898 split_out,
1899 self.sort,
1900 shuffle_method,
1901 self._slice,
1902 *self.by,
1903 )
1904 )
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_collection.py:4440, in new_collection(expr)
4438 def new_collection(expr):
4439 """Create new collection from an expr"""
-> 4440 meta = expr._meta
4441 expr._name # Ensure backend is imported
4442 return get_collection_type(meta)(expr)
File ~/miniconda3/envs/test/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
979 val = cache.get(self.attrname, _NOT_FOUND)
980 if val is _NOT_FOUND:
--> 981 val = self.func(instance)
982 try:
983 cache[self.attrname] = val
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_groupby.py:432, in GroupbyAggregation._meta(self)
430 @functools.cached_property
431 def _meta(self):
--> 432 return self._lower()._meta
File ~/miniconda3/envs/test/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
979 val = cache.get(self.attrname, _NOT_FOUND)
980 if val is _NOT_FOUND:
--> 981 val = self.func(instance)
982 try:
983 cache[self.attrname] = val
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_reductions.py:425, in ApplyConcatApply._meta(self)
423 @functools.cached_property
424 def _meta(self):
--> 425 meta = self._meta_chunk
426 aggregate = self.aggregate or (lambda x: x)
427 if self.combine:
File ~/miniconda3/envs/test/lib/python3.10/site-packages/dask_expr/_core.py:451, in Expr.__getattr__(self, key)
447 except AttributeError as err:
448 if key.startswith("_meta"):
449 # Avoid a recursive loop if/when `self._meta*`
450 # produces an `AttributeError`
--> 451 raise RuntimeError(
452 f"Failed to generate metadata for {self}. "
453 "This operation may not be supported by the current backend."
454 )
456 # Allow operands to be accessed as attributes
457 # as long as the keys are not already reserved
458 # by existing methods/properties
459 _parameters = type(self)._parameters
RuntimeError: Failed to generate metadata for DecomposableGroupbyAggregation(frame=df['name'], arg=<dask.dataframe.groupby.Aggregation object at 0x7f052960b850>, observed=False, split_out=1). This operation may not be supported by the current backend.
My thinking is numerical objects are expected, but the backend is pandas, so string manipulations should be possible, right?
There are several things that probably won't work in you example:
In Dask, collections are immutables, you don't create an empty dask dataframe to fill it later, its lazy and distributed.
Custom aggregations are more complex than that, as stated in the documentation, it works on SeriesGroupBy objects, not lists.
There are typos in your example, and it lacks the initial DataFrame.
Custom Dask aggregations are really hard to get right, however, in your case, I found a simpler solution:
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame.from_dict({'id':[1,1,2,2,2,4], 'name':['a','b','c','d','e','f']})
ddf = dd.from_pandas(df, 2)
ddf2 = ddf.groupby('id').agg(list)
ddf2["name2"] = ddf2["name"].apply(lambda l: "|".join(l))
ddf2.compute()
Which produces:
name name2
id
1 [a, b] a|b
2 [c, d, e] c|d|e
4 [f] f