pythonpandasgroup-bydaskdask-dataframe

Dask Dataframe Mode on groupy?


I am trying to extract the "mode" of a series under a groupby agregation in a dask dataframe. I could find the documentation of mode, but not how to use it under a group by.

import pandas as pd
import numpy as np
data = pd.DataFrame({
    'status' :  ['pending', 'pending','pending', 'canceled','canceled','canceled', 'confirmed', 'confirmed','confirmed'],
    'clientId' : ['A', 'B', 'C', 'A', 'D', 'C', 'A', 'B','C'],
    'partner' :  ['A', np.nan,'C', 'A',np.nan,'C', 'A', np.nan,'C'],
    'product' : ['afiliates', 'pre-paid', 'giftcard','afiliates', 'pre-paid', 'giftcard','afiliates', 'pre-paid', 'giftcard'],
    'brand' : ['brand_1', 'brand_2', 'brand_3','brand_1', 'brand_2', 'brand_3','brand_1', 'brand_3', 'brand_3'],
    'gmv' : [100,100,100,100,100,100,100,100,100]})

data = data.astype({'partner':'category','status':'category','product':'category', 'brand':'category'})

import dask.dataframe as dd
df = dd.from_pandas(data,npartitions=1)
df.groupby(['clientId', 'product'], observed=True).aggregate({'brand':'mode'})
df.compute()

Thanks!


Solution

  • This answer is based on a code provided here, with some modifications:

    from pandas import DataFrame, Series, NA
    from dask.dataframe import from_pandas, Aggregation
    
    data = DataFrame(
        {
            "status": [
                "pending",
                "pending",
                "pending",
                "canceled",
                "canceled",
                "canceled",
                "confirmed",
                "confirmed",
                "confirmed",
            ],
            "clientId": ["A", "B", "C", "A", "D", "C", "A", "B", "C"],
            "partner": ["A", NA, "C", "A", NA, "C", "A", NA, "C"],
            "product": [
                "afiliates",
                "pre-paid",
                "giftcard",
                "afiliates",
                "pre-paid",
                "giftcard",
                "afiliates",
                "pre-paid",
                "giftcard",
            ],
            "brand": [
                "brand_4",
                "brand_2",
                "brand_3",
                "brand_1",
                "brand_2",
                "brand_3",
                "brand_1",
                "brand_3",
                "brand_3",
            ],
            "gmv": [100, 100, 100, 100, 100, 100, 100, 100, 100],
        }
    )
    
    data = data.astype(
        {
            "partner": "category",
            "status": "category",
            "product": "category",
            "brand": "category",
        }
    )
    mode_pandas = data.groupby(["clientId", "product"], observed=True).agg(
        {"brand": Series.mode}
    )
    
    df = from_pandas(data, npartitions=1)
    
    
    def chunk(s):
        return s.value_counts()
    
    
    def agg(s0):
        _intermediate = s0._selected_obj.groupby(level=s0._selected_obj.index.names).sum()
        _intermediate = _intermediate[_intermediate > 0]
        return _intermediate
    
    
    def finalize(s):
        level = list(range(s.index.nlevels - 1))
        return s.groupby(level=level, group_keys=False).apply(lambda s: s[s == s.max()])
    
    
    mode = Aggregation(
        name="mode",
        chunk=chunk,
        agg=agg,
        finalize=finalize,
    )
    
    
    mode_dask = df.groupby(["clientId", "product"], observed=True, dropna=True).aggregate(
        {"brand": mode}
    ).compute()
    
    print(mode_pandas)
    print(mode_dask)
    

    Note that the dask version doesn't yield exactly the same output as pandas, but that's an interesting exercise left to the reader.