javaapache-kafkaapache-kafka-streams

Filtering and forwarding Kafka messages based on key alone with Kafka Streams


I'd like to understand and possibly improve some Kafka Streams code I'm currently working on. Right now I'm not completely sure about the complete lifecycle of the deserialization and serialization process.

The core idea of the code is rather simple: There's a Kafka topic where a lot of messages are incoming, and at the ingress the messages are "tagged" with some information in the key. For the sake of this question, let's say that the key starts with the "group" of the message, e.g. "groupA" or "groupXX".

Now the application dynamically creates Kafka Streams to filter certain groups to new topics.

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

// ...

var filteredGroup = "groupYYZ"; // hard-coded here, would come in from a parameter IRL
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> inputStream = builder.stream("main-topic");

inputStream.filter((key, value) -> key != null && key.startsWith(filteredGroup))
           .to(filteredGroup+"-topic", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Do I understand it correctly that both key and value are deserialized (using the serde classes from the Properties), and then serialized again when they new messages are sent to the output topic?

If so, as I'm only using the key to filter the messages, is there a way to avoid this and just copy the value part or would I need to do this with a DIY consumer/producer setup instead of the high-level Streams API?


Solution

  • Do I understand it correctly that both key and value are deserialized (using the serde classes from the Properties), and then serialized again when they new messages are sent to the output topic?

    Yes.

    If so, as I'm only using the key to filter the messages, is there a way to avoid this and just copy the value part or would I need to do this with a DIY consumer/producer setup instead of the high-level Streams API?

    Yes. You can use ByteArraySerde for the value which is a no-op serde.