pythonbigdatadaskdask-dataframe

Custom Aggregations using Dask Dataframe


I got a large dataset with over 70 millions of rows and want to apply a custom groupby aggregation to this dataframe. the aggregation method will be harmonic mean, so the problem begins here. Since dask does aggregations for each chunk and then re-applies another aggregation to the merged portions; i believe harmonic mean would not work correctly here since harmonic mean of [[1,3,5],[7,9]] is not same with [1,3,5,7,9] Is there a way to achieve this using dask?

my custom aggregation:

custom_hmean = dd.Aggregation('custom_hmean', lambda s: s.agg(hmean), lambda s0: s0.agg(hmean))

Solution

  • It's not ideal and can quickly exceed the available integer limits, but one option is to compute the product and count separately, then apply the harmonic mean calculation based on these two components.

    Roughly:

    from dask.dataframe import from_dict
    
    ddf = from_dict({"x": range(10), "y": list("aabbccddee")}, npartitions=2)
    
    # create inverses
    ddf["inv_x"] = 1 / ddf["x"]
    
    result = (
        ddf.groupby("y")
        .agg(numerator=("x", "size"), denominator=("inv_x", "sum"))
        .assign(hmean=lambda df: df["numerator"] / df["denominator"])
        .compute()
    )
    #    numerator  denominator     hmean
    # y                                  
    # a          2     0.000000       inf
    # b          2     0.833333  2.400000
    # c          2     0.450000  4.444444
    # d          2     0.309524  6.461538
    # e          2     0.236111  8.470588
    

    With some refinement this might be feasible to compute in your circumstances.