daskdask-dataframe

Dask - custom aggregation


I've just learned about Dask yesterday and now am upgrading my work from pandas... but am stuck trying to translate simple custom aggregations.

I'm not fully (or probably even at all) understanding how the Series are represented inside those internal lambda functions; normally I would step in with breakpoint() to inspect them, but this isn't an option here. When I try to get an element of x with index, I get an "Meta" error.

Any help/pointers would be appreciated.

import dask.dataframe as dd
import pandas as pd

#toy df
df = dd.from_pandas(pd.DataFrame(dict(a = [1, 1, 2, 2], \
      b = [100, 100, 200, 250])), npartitions=2)
df.compute()

    a   b
0   1   100
1   1   100
2   2   200
3   2   250

# PART 1 - for conceptual understanding - replicating trivial list

# intended result
df.groupby('a').agg(list).compute()

    b
a   
1   [100, 100]
2   [200, 250]

# replicate manually with custom aggregation
list_manual = dd.Aggregation('list_manual', lambda x: list(x), \
      lambda x1: list(x1))
res = df.groupby('a').agg(list_manual).compute()
res


    b
0   (0, [(1, 0 100\n1 100\nName: b, dtype: i...

res.b[0]

(0,
 0    (1, [100, 100])
 0    (2, [200, 250])
 Name: list_manual-b-6785578d38a71d6dbe0d1ac6515538f7, dtype: object)

# looks like the grouping tuple wasn't even unpacked (1, 2 groups are there)
# ... instead it all got collapsed into one thing


# PART 2 - custom function

# with pandas - intended behavior
collect_uniq = lambda x: list(set(x))
dfp = df.compute()
dfp.groupby('a').agg(collect_uniq)

    b
a   
1   [100]
2   [200, 250]

#now trying the same in Dask
collect_uniq_dask = dd.Aggregation('collect_uniq_dask', \
      lambda x: list(set(x)), lambda x1: list(x1))
res = df.groupby('a').agg(collect_uniq_dask).compute()

# gives TypeError("unhashable type: 'Series'")


Solution

  • I realize the original post is almost 2 years old at this point, but I'm posting a reply here in case anyone else comes across this after struggling with something similar as I did.

    My understanding (after discovering and learning about Dask in just the last few days) is that the input to the chunk step of the Dask custom aggregation is essentially just a Pandas dataframe groupby object representing the grouped data in one Dask partition. Therefore, the lambda function for chunk can be defined in the same way you would want to define your aggregation using Pandas for one partition of your data. Instead of chunk = lambda x: list(x), you need to set chunk = lambda x: x.aggregate(lambda y: list(y)) or chunk = lambda x: x.apply(lambda y: list(y)) if your goal is to aggregate all elements in the same group in each column into a list. The input seen by the outer chunk lambda function here (the lambda function of x) is a grouped Pandas dataframe (from one Dask partition) and the input seen by the inner lambda function (the lambda function of y) is a Series representing one column of that grouped Pandas dataframe. The input to the agg function is similar to the input to the chunk function except the individual elements are now the outputs of chunk. Just as for chunk, we can use the .aggregate or .apply method to define the agg function, but we need to think more carefully about the inner lambda function that we should pass to .aggregate or .apply...

    If we use the same function for agg as we used for chunk, i.e. agg = lambda x0: x0.aggregate(lambda y: list(y)), the inner lambda function (of y) receives as its input a series of N lists, which were the outputs of chunk on one column for N Dask partitions, and consequently outputs a list of N lists. This is probably not the behaviour you intended. In order to construct a flattened list of all elements as the final output of the aggregation for each column for each group, the function used for agg needs to unpack the lists output by chunk. Something like this would do the job: agg = lambda x0: x0.apply(lambda y: [z for y1 in y for z in y1]).

    Here are some minimal working examples:

    import pandas as pd
    import dask.dataframe as dd
    
    ## Create Pandas dataframe
    pdf = pd.DataFrame(
        data={
            'Group': ['G1', 'G2', 'G1', 'G1', 'G3', 
                      'G3', 'G2', 'G1', 'G3', 'G4'],
            'Col1': [{1,2,3}, {1,4,5}, {2,3}, {3,4,5}, {2}, 
                     {1,3,5}, {1,2,3,4,5}, {4,5}, {13}, {14}],
            'Col2': [{'a','b','c'}, {'a','d','e'}, {'b','c'}, {'c','d','e'}, {'b'}, 
                     {'a','c','e'}, {'a','b','c','d','e'}, {'d','e'}, {'m'}, {'n'}]
        }
    )
    display(pdf)
    
    ## Create Dask dataframe from Pandas dataframe
    ddf = dd.from_pandas(pdf, npartitions=2)
    display(ddf)
    
    ## Group Dask dataframe
    ddf_ = ddf.groupby(by='Group', sort=False)
    
    ## ===== WORKING EXAMPLES ===== ##
    
    ## Example 1: aggregating elements into lists.
    print("\nWorking Example 1: Aggregating elements into lists.")
    list_func = lambda y: list(y)
    agg_by_func = dd.Aggregation(
        name = 'agg_by_func', 
        chunk = lambda x: x.aggregate(list_func), 
        agg = lambda x0: x0.aggregate(lambda y: [z for y1 in y for z in y1])
    )
    display(ddf_.agg(agg_by_func).compute())
    
    ## Example 2: equivalent to Example 1.
    print("\nWorking Example 2: Equivalent to Example 1 but using `apply` 
    instead of `aggregate` inside the `Aggregation` class.")
    list_func = lambda y: list(y)
    agg_by_func = dd.Aggregation(
        name = 'agg_by_func', 
        chunk = lambda x: x.apply(list_func), 
        agg = lambda x0: x0.apply(lambda y: [z for y1 in y for z in y1])
    )
    display(ddf_.agg(agg_by_func).compute())
    
    ## Example 3: aggregating unique elements (converted to hashable tuples) into sets.
    print("\nWorking Example 3: Aggregating unique elements (converted to hashable tuples) into sets.")
    set_func = lambda y: set(tuple(x) for x in y)
    agg_by_func = dd.Aggregation(
        name = 'agg_by_func', 
        chunk = lambda x: x.aggregate(set_func), 
        agg = lambda x0: x0.aggregate(lambda y: set.union(*y))
    )
    display(ddf_.agg(agg_by_func).compute())
    
    ## ============================ ##
    

    In my specific case, I wanted to perform aggregation by taking unions of sets (the elements to be aggregated in my dataframe were sets of strings, hence my use of sets in my example code above). This turned out to be a headache that took me 2 days to figure out, even though it was very easy to do with Pandas. Here is how I achieved that for the above dataframe:

    union_func = lambda y: set(z for y0 in y for z in y0)
    agg_by_func = dd.Aggregation(
        name = 'agg_by_func', 
        chunk = lambda x: x.aggregate(union_func), 
        agg = lambda x0: x0.aggregate(union_func)
    )
    display(ddf_.agg(agg_by_func).compute())
    

    Curiously, and I don't know why this is, the code above works as intended with union_func = lambda y: set(z for y0 in y for z in y0) but it does not work at all (it gives errors, including about failure to infer output metadata, which I have not been able to decipher or solve -- neither the Aggregation class nor the agg method accept the meta parameter that is common to many Dask methods) with union_func = lambda y: set.union(*y), which is an equivalent function when the elements of its input are sets. Both of these union_func functions work perfectly fine in a Pandas aggregation:

    ## Group then aggregate Pandas dataframe.
    print('Result of Pandas groupby and aggregate:')
    pdf_ = pdf.groupby(by='Group', sort=False)
    pdf_ = pdf_.aggregate(union_func)
    display(pdf_)