javaapache-kafkaaggregateapache-kafka-streams

KafkaStream aggregations with progressively restricting keys


I'm struggling with the logic behind kafkastream on the aggregations. I have records with string keys in the form of A_B_C and values that are basically a int value and i want to progressively aggregate on the key by getting the max of the values in a group and then from these max values the distinct values. I'll leave an example below that i hope helps in understanding what i need. What don't really understand is the logic of the aggregator. I understood the concept of the underlying state store (in case of a KGroupedTable) that allows to update the final output, but i don't understand how it behaves in a case where the key is changed (basically truncated) before grouping. I hope that the example below helps in clarifying what i mean. What i'd like to know is the feasibility of the thing and if anyone already run in something like that and what were the soluzione

In a topic i have records like, the keys are unique since the topic is compacted and it is firstly loaded as a table.

key value
A_B_C 1
A_B_D 1
A_B_E 3
A_G_F 2
A_L_M 1

I want the first aggregation to get the max value for each subkey

key value
A_B 3
A_G 2
A_L 1

and then get the distinct values grouped by the final subkey

key value
A (1,2,3)

Moreover i receive tombstone events so by taking the example if i receive

key value
A_B_E null

the aggregations, considering all the other records as above, should become

key value
A_B 2
A_G 2
A_L 1

and finally

key value
A (1,2)

EDITS: table format


Solution

  • What you try to do is certainly possible, however, you need to consider a few things.

    The max aggregation you want to compute, is not "subtractable", ie, given the current max, you cannot just compute the new max if a new value is removed, but you need all values that go into the max function.

    An aggregation in Kafka Streams, only stores the current result, and updates the result on "add new value" and "remove old value". For eg, sum this works well. If you current sum is 10, and you add 2, it become 12, and if you remove 2, you can compute 8 as new sum. However, for max if the current max is 10, and 10 is removed, you cannot compute the new max (you would be able to compute a new max if 8 is added, as 10 stays the max, or if 12 is added, as 12 would become the new max).

    Thus, you will need to do a "two step" aggregation: the first step, will "collect" all values (eg, in a list -- Kafka Streams already provides ListSerdes you can use), and the second step takes the list to compute the max. This way, when a value is removed, you can strip it from the list, and the second step can compute the new max with the updated list as input:

    KTable<String, Integer> input = builder.table(...);
    KTable<String, List<Integer>> valueList =
        input.groupBy(/* set first sub-key */)
             .aggregate(/* maintain a list of values */);
    // for the remove step, just scan the list,
    // and remove the first value which matches the removed value
    
    KTable<String, Integer> max =
        valueList.mapValue(/* scan the list, find the max, and return it*/);
    

    For the distinct step, you will need to do the same. First collect a list of all values (including duplicates), and use mapValues() to remove duplicates for the final result.

    Note: keeping duplicates in the List is important in both steps, to compute the correct result.