pythonpandasdataframedaskdask-dataframe

Dask custom aggregation - conditional sum inside multiple aggregation


I am trying to translate this pandas code into dask.

df2 = df.groupby(
    ['id', 'day']
    ).agg(
        x=('x', 'nunique'),
        y=('y', 'sum'),
        z1=('z', lambda x: x.isin(['A', 'B']).sum()),
        z=('z', 'nunique')
).reset_index()

In order to test the results, I have created this small example dataset:

df = pd.DataFrame({"id": [1,1,2,2,2,3,3,4],
                "day":[1,1,1,2,1,1,2,1],
                "x":[1,1,1,1,2,1,1,1],
                "y":[10,20,10,10,10,10,10,10],
                "z":['A','B','A','C','B','A','B','C']})

I am expecting the following result:

+---+---+---+---+-------+
| id|day|  x|  y| z1|  z|
+---+---+---+---+-------+
|  1|  1|  1| 30|  2|  2|
|  3|  2|  1| 10|  1|  1|
|  2|  2|  1| 10|  0|  1|
|  4|  1|  1| 10|  0|  1|
|  2|  1|  2| 20|  2|  2|
|  3|  1|  1| 10|  1|  1|
+---+---+---+---+-------+

The closest I got is to do:

ddf = dd.from_pandas(df, npartitions=1)

nunique = dd.Aggregation(
    name="nunique",
    chunk=lambda s: s.apply(lambda x: list(set(x))),
    agg=lambda s0: s0.obj.groupby(
        level=list(range(s0.obj.index.nlevels))).sum(),
    finalize=lambda s1: s1.apply(lambda final: len(set(final))),
)

ddf2 = (
    ddf.groupby(['id', 'day'])
    .agg({
        'x': nunique,
        'y': 'sum',
        'z': nunique
    })
    .fillna(0)
    .reset_index()
)

Which will do everything correctly but the conditional sum z1=('z', lambda x: x.isin(['A', 'B']).sum())

I think I should do a custom dask aggregation, but I can't make it work.


Solution

  • This might not be the answer you are looking for, but why not simplify the problem by creating a variable that contains 1/0 if the condition is satisfied and then run a simple sum over it?

    ddf['z1'] = ddf['z'].isin(["A", "B"])
    ddf2 = ddf.groupby(["id", "day"]).agg({'z1': 'sum'})