I'm using spring-boot-starter-parent
version 3.1.2
and spring-cloud-stream-binder-kafka-streams
version 4.0.3
The vast majority of examples online show creating a GlobalKTable with the @Input
annotation, which has been deprecated. I have yet to find a working example of how to create a GKT as a KeyValueStore with the latest version of Kafka Streams Binder.
The closest I've found is to bypass Binder and manually create a KafkaStreams bean with the GKT:
@Bean
public KafkaStreams kafkaStreams(
@Value("${spring.cloud.stream.kafka.streams.binder.brokers}") String brokers) {
StreamsBuilder builder = new StreamsBuilder();
// Create a GlobalKTable
builder.globalTable(
Topics.GKT_TOPIC,
Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as(
Stores.MY_GKT_STORE_NAME)
.withKeySerde(KeySerde)
.withValueSerde(ValueSerde));
// Setup stream processing logic here...
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
// Start the streams
streams.start();
return streams;
}
Could anyone point me to Docs or an example of how to do this with the latest version of Kafka Streams Binder?
I finally figured out how to do this in the docs
As mentioned above, the binder does not provide a first class way to register global state stores as a feature. For that, you need to use the customizer. Here is how that can be done.
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) { }
};
}