I'm struggling with the logic behind kafkastream on the aggregations. I have records with string keys in the form of A_B_C and values that are basically a int value and i want to progressively aggregate on the key by getting the max of the values in a group and then from these max values the distinct values. I'll leave an example below that i hope helps in understanding what i need. What don't really understand is the logic of the aggregator. I understood the concept of the underlying state store (in case of a KGroupedTable) that allows to update the final output, but i don't understand how it behaves in a case where the key is changed (basically truncated) before grouping. I hope that the example below helps in clarifying what i mean. What i'd like to know is the feasibility of the thing and if anyone already run in something like that and what were the soluzione
In a topic i have records like, the keys are unique since the topic is compacted and it is firstly loaded as a table.
key | value |
---|---|
A_B_C | 1 |
A_B_D | 1 |
A_B_E | 3 |
A_G_F | 2 |
A_L_M | 1 |
I want the first aggregation to get the max value for each subkey
key | value |
---|---|
A_B | 3 |
A_G | 2 |
A_L | 1 |
and then get the distinct values grouped by the final subkey
key | value |
---|---|
A | (1,2,3) |
Moreover i receive tombstone events so by taking the example if i receive
key | value |
---|---|
A_B_E | null |
the aggregations, considering all the other records as above, should become
key | value |
---|---|
A_B | 2 |
A_G | 2 |
A_L | 1 |
and finally
key | value |
---|---|
A | (1,2) |
EDITS: table format
What you try to do is certainly possible, however, you need to consider a few things.
The max
aggregation you want to compute, is not "subtractable", ie, given the current max, you cannot just compute the new max if a new value is removed, but you need all values that go into the max
function.
An aggregation in Kafka Streams, only stores the current result, and updates the result on "add new value" and "remove old value". For eg, sum
this works well. If you current sum is 10, and you add 2, it become 12, and if you remove 2, you can compute 8 as new sum. However, for max
if the current max is 10, and 10 is removed, you cannot compute the new max (you would be able to compute a new max if 8 is added, as 10 stays the max, or if 12 is added, as 12 would become the new max).
Thus, you will need to do a "two step" aggregation: the first step, will "collect" all values (eg, in a list -- Kafka Streams already provides ListSerdes
you can use), and the second step takes the list to compute the max
. This way, when a value is removed, you can strip it from the list, and the second step can compute the new max with the updated list as input:
KTable<String, Integer> input = builder.table(...);
KTable<String, List<Integer>> valueList =
input.groupBy(/* set first sub-key */)
.aggregate(/* maintain a list of values */);
// for the remove step, just scan the list,
// and remove the first value which matches the removed value
KTable<String, Integer> max =
valueList.mapValue(/* scan the list, find the max, and return it*/);
For the distinct step, you will need to do the same. First collect a list of all values (including duplicates), and use mapValues()
to remove duplicates for the final result.
Note: keeping duplicates in the List
is important in both steps, to compute the correct result.