I am using Kafka Streams to group and reduce a kafka topic.
I want to generate an output for a key, if the key and the value are equal for all values against a given key; otherwise don't output anything for that key.
To do this, however, I need access to the key
, and the .reduce
methods only give access to the aggregated/current values.
Is it possible to get the key when calling .reduce? Or is there another way this could be done?
You could use the aggregate
method of the KGroupedStream
interface:
Aggregate the values of records in this stream by the grouped key. Records with
null
key or value are ignored. Aggregating is a generalization ofcombining via reduce(...)
as it, for example, allows the result to have a different type than the input values.
If you want to output a result for a key only if the value matches, you create a Ktable, something like:
KTable<String, Boolean> resultTable = inputStream
.groupByKey()
.aggregate(
() -> true,
(key, value, agg) -> agg && key.equals(value),
Materialized.with(Serdes.String(), Serdes.Boolean())
);
You can then filter the values in the table, something like:
KStream<String, Boolean> finalResult = resultTable
.toStream()
.filter((key, allEqual) -> allEqual);