apache-kafkaaggregateapache-kafka-streams

KGroupedStream reduce use key in Reducer logic


I am using Kafka Streams to group and reduce a kafka topic.

I want to generate an output for a key, if the key and the value are equal for all values against a given key; otherwise don't output anything for that key.

To do this, however, I need access to the key, and the .reduce methods only give access to the aggregated/current values.

Is it possible to get the key when calling .reduce? Or is there another way this could be done?


Solution

  • You could use the aggregate method of the KGroupedStream interface:

    <VR> KTable<K,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator)

    Aggregate the values of records in this stream by the grouped key. Records with null key or value are ignored. Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values.

    If you want to output a result for a key only if the value matches, you create a Ktable, something like:

    KTable<String, Boolean> resultTable = inputStream
        .groupByKey()
        .aggregate(
            () -> true,
            (key, value, agg) -> agg && key.equals(value),
            Materialized.with(Serdes.String(), Serdes.Boolean())
        );
     
    

    You can then filter the values in the table, something like:

    KStream<String, Boolean> finalResult = resultTable
        .toStream()
        .filter((key, allEqual) -> allEqual);