javaspring-bootspring-kafkaspring-cloud-config

Issue with `bootstrap.servers` in Kafka Producer Configuration with Spring Boot 3.2.6 and Spring Cloud 2023.0.2


I am experiencing an issue with Kafka producer configuration in my Spring Boot application after upgrading to newer versions of Spring Boot and Spring Cloud dependencies.

Environment

Problem Description

In Spring Boot 2.7.8 with Spring Cloud Dependencies 2021.0.8, the org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration class that creates DefaultKafkaProducerFactory bean by method kafkaProducerFactory was correctly populated with properties from the Spring Config Server, including the bootstrap.servers property.

    @Bean
    @ConditionalOnMissingBean(ProducerFactory.class)
    public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(
            ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
                this.properties.buildProducerProperties());
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }

After upgrading to Spring Boot 3.2.6 and Spring Cloud Dependencies 2023.0.2, I noticed that the DefaultKafkaProducerFactory's bean created by method kafkaProducerFactory configuration logic has changed. Specifically, the addition of this.applyKafkaConnectionDetailsForProducer(properties, connectionDetails); is now used, which overwrites the bootstrap.servers property set by the Spring Config server value to localhost:9092.

    @Bean
    @ConditionalOnMissingBean({ProducerFactory.class})
    public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(KafkaConnectionDetails connectionDetails, ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers, ObjectProvider<SslBundles> sslBundles) {
        Map<String, Object> properties = this.properties.buildProducerProperties((SslBundles)sslBundles.getIfAvailable());
        this.applyKafkaConnectionDetailsForProducer(properties, connectionDetails);
        DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory(properties);
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }

        customizers.orderedStream().forEach((customizer) -> {
            customizer.customize(factory);
        });
        return factory;
    }

The KAFKA_BOOTSTRAP_SERVERS environment variable is set correctly, and this configuration is supposed to be fetched from the Spring Config Server. I see the Map<String, Object> properties = this.properties.buildProducerProperties((SslBundles)sslBundles.getIfAvailable()); correctly populated the properties with bootstrap.servers -> my.eastus2.azure.confluent.cloud:9092. However the next step overwrites that with localhost:9092

Configuration Snippet

Here is a snippet of my configuration that works in the older versions but fails in the new versions:

# application.yml
spring:
  # To overwrite properties loaded from App Config, include below line in application.yml
  config:
    import: optional:configserver:https://my.config.server.com/v1/config/
   producer:
     key-serializer: org.apache.kafka.common.serialization.StringSerializer
     value-serializer: org.apache.kafka.common.serialization.StringSerializer
    properties:
      ### Auth ###
      sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        clientId='${my_client_id}' \
        clientSecret='${my_client_secret}' \
        scope='' \
        extension_logicalCluster='${my_kafka_cluster_id}' \
        extension_identityPoolId='${my_identity_pool}';"

      ### Schema Registry ###
      bearer.auth.client.id: ${my_client_id}
      bearer.auth.client.secret: ${my_client_secret}
      bearer.auth.identity.pool.id: ${my_identity_pool:pool-xyz}

      kekcipherkms.provider.identity.client.id: ${my_client_id}
      kekcipherkms.provider.identity.client.secret: ${my_client_secret}
      kekcipherkms.provider.identity.scope: my.scope.w

    ### Producer ###
    producer:
      properties:
        topic: ${topic}

Observed Behavior

Expected Behavior

The DefaultKafkaProducerFactory should use the bootstrap.servers property from the Spring Config Server as it did in the previous versions.

Question

How can I configure my Spring Boot application to correctly use the bootstrap.servers value from the Spring Config Server in the newer versions of Spring Boot and Spring Cloud? Is there something I am missing to ensure KafkaConnectionDetails bootstrap.servers = my Spring Config Servers value and not the default localhost:9092 ?

Additional Context

If anyone has encountered a similar issue or has insights on the changes introduced in the newer versions that might be affecting this behavior, your help would be greatly appreciated.


Solution

  • Found my answer with a little more searching on Stack overflow and a little help from these other posts that were similar:

    why is springboot not picking up apache kafka bootstrap-servers in application.yml? answer by Gary Russell

    Kafka client connection issue for spring boot 3.1.X and above answer by Artem Bilan

    The existing Spring Config Server is providing a spring.kafka.properties.bootstrap.servers value but since Spring Boot 3.1.0 it needs to be spring.kafka.bootstrap-servers

    So my hack of a solution until I can submit a ticket to have the company update the spring config server was to do the following :

    # application.yml
    spring:
      kafka:
        bootstrap-servers: ${spring.kafka.properties.bootstrap.servers}