apache-kafkaspring-kafkaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

GlobalKTable as QueryableStore with Kafka Streams Binder


I'm using spring-boot-starter-parent version 3.1.2 and spring-cloud-stream-binder-kafka-streams version 4.0.3

The vast majority of examples online show creating a GlobalKTable with the @Input annotation, which has been deprecated. I have yet to find a working example of how to create a GKT as a KeyValueStore with the latest version of Kafka Streams Binder.

The closest I've found is to bypass Binder and manually create a KafkaStreams bean with the GKT:

@Bean
public KafkaStreams kafkaStreams(
    @Value("${spring.cloud.stream.kafka.streams.binder.brokers}") String brokers) {
  StreamsBuilder builder = new StreamsBuilder();

  // Create a GlobalKTable
  builder.globalTable(
      Topics.GKT_TOPIC,
      Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as(
              Stores.MY_GKT_STORE_NAME)
          .withKeySerde(KeySerde)
          .withValueSerde(ValueSerde));

  // Setup stream processing logic here...

  Properties config = new Properties();
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id");
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
  KafkaStreams streams = new KafkaStreams(builder.build(), config);

  // Start the streams
  streams.start();

  return streams;
}

Could anyone point me to Docs or an example of how to do this with the latest version of Kafka Streams Binder?


Solution

  • I finally figured out how to do this in the docs

    As mentioned above, the binder does not provide a first class way to register global state stores as a feature. For that, you need to use the customizer. Here is how that can be done.

    @Bean
    public StreamsBuilderFactoryBeanCustomizer customizer() {
        return fb -> {
            try {
                final StreamsBuilder streamsBuilder = fb.getObject();
                streamsBuilder.addGlobalStore(...);
            }
            catch (Exception e) { }
        };
    }