I am trying to translate this pandas code into dask.
df2 = df.groupby(
['id', 'day']
).agg(
x=('x', 'nunique'),
y=('y', 'sum'),
z1=('z', lambda x: x.isin(['A', 'B']).sum()),
z=('z', 'nunique')
).reset_index()
In order to test the results, I have created this small example dataset:
df = pd.DataFrame({"id": [1,1,2,2,2,3,3,4],
"day":[1,1,1,2,1,1,2,1],
"x":[1,1,1,1,2,1,1,1],
"y":[10,20,10,10,10,10,10,10],
"z":['A','B','A','C','B','A','B','C']})
I am expecting the following result:
+---+---+---+---+-------+
| id|day| x| y| z1| z|
+---+---+---+---+-------+
| 1| 1| 1| 30| 2| 2|
| 3| 2| 1| 10| 1| 1|
| 2| 2| 1| 10| 0| 1|
| 4| 1| 1| 10| 0| 1|
| 2| 1| 2| 20| 2| 2|
| 3| 1| 1| 10| 1| 1|
+---+---+---+---+-------+
The closest I got is to do:
ddf = dd.from_pandas(df, npartitions=1)
nunique = dd.Aggregation(
name="nunique",
chunk=lambda s: s.apply(lambda x: list(set(x))),
agg=lambda s0: s0.obj.groupby(
level=list(range(s0.obj.index.nlevels))).sum(),
finalize=lambda s1: s1.apply(lambda final: len(set(final))),
)
ddf2 = (
ddf.groupby(['id', 'day'])
.agg({
'x': nunique,
'y': 'sum',
'z': nunique
})
.fillna(0)
.reset_index()
)
Which will do everything correctly but the conditional sum
z1=('z', lambda x: x.isin(['A', 'B']).sum())
I think I should do a custom dask aggregation, but I can't make it work.
This might not be the answer you are looking for, but why not simplify the problem by creating a variable that contains 1/0 if the condition is satisfied and then run a simple sum over it?
ddf['z1'] = ddf['z'].isin(["A", "B"])
ddf2 = ddf.groupby(["id", "day"]).agg({'z1': 'sum'})