I am trying to extract the "mode" of a series under a groupby agregation in a dask dataframe. I could find the documentation of mode, but not how to use it under a group by.
import pandas as pd
import numpy as np
data = pd.DataFrame({
'status' : ['pending', 'pending','pending', 'canceled','canceled','canceled', 'confirmed', 'confirmed','confirmed'],
'clientId' : ['A', 'B', 'C', 'A', 'D', 'C', 'A', 'B','C'],
'partner' : ['A', np.nan,'C', 'A',np.nan,'C', 'A', np.nan,'C'],
'product' : ['afiliates', 'pre-paid', 'giftcard','afiliates', 'pre-paid', 'giftcard','afiliates', 'pre-paid', 'giftcard'],
'brand' : ['brand_1', 'brand_2', 'brand_3','brand_1', 'brand_2', 'brand_3','brand_1', 'brand_3', 'brand_3'],
'gmv' : [100,100,100,100,100,100,100,100,100]})
data = data.astype({'partner':'category','status':'category','product':'category', 'brand':'category'})
import dask.dataframe as dd
df = dd.from_pandas(data,npartitions=1)
df.groupby(['clientId', 'product'], observed=True).aggregate({'brand':'mode'})
df.compute()
Thanks!
This answer is based on a code provided here, with some modifications:
from pandas import DataFrame, Series, NA
from dask.dataframe import from_pandas, Aggregation
data = DataFrame(
{
"status": [
"pending",
"pending",
"pending",
"canceled",
"canceled",
"canceled",
"confirmed",
"confirmed",
"confirmed",
],
"clientId": ["A", "B", "C", "A", "D", "C", "A", "B", "C"],
"partner": ["A", NA, "C", "A", NA, "C", "A", NA, "C"],
"product": [
"afiliates",
"pre-paid",
"giftcard",
"afiliates",
"pre-paid",
"giftcard",
"afiliates",
"pre-paid",
"giftcard",
],
"brand": [
"brand_4",
"brand_2",
"brand_3",
"brand_1",
"brand_2",
"brand_3",
"brand_1",
"brand_3",
"brand_3",
],
"gmv": [100, 100, 100, 100, 100, 100, 100, 100, 100],
}
)
data = data.astype(
{
"partner": "category",
"status": "category",
"product": "category",
"brand": "category",
}
)
mode_pandas = data.groupby(["clientId", "product"], observed=True).agg(
{"brand": Series.mode}
)
df = from_pandas(data, npartitions=1)
def chunk(s):
return s.value_counts()
def agg(s0):
_intermediate = s0._selected_obj.groupby(level=s0._selected_obj.index.names).sum()
_intermediate = _intermediate[_intermediate > 0]
return _intermediate
def finalize(s):
level = list(range(s.index.nlevels - 1))
return s.groupby(level=level, group_keys=False).apply(lambda s: s[s == s.max()])
mode = Aggregation(
name="mode",
chunk=chunk,
agg=agg,
finalize=finalize,
)
mode_dask = df.groupby(["clientId", "product"], observed=True, dropna=True).aggregate(
{"brand": mode}
).compute()
print(mode_pandas)
print(mode_dask)
Note that the dask
version doesn't yield exactly the same output as pandas
, but that's an interesting exercise left to the reader.