I am experiencing an issue with Kafka producer configuration in my Spring Boot application after upgrading to newer versions of Spring Boot and Spring Cloud dependencies.
In Spring Boot 2.7.8 with Spring Cloud Dependencies 2021.0.8, the org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
class that creates DefaultKafkaProducerFactory
bean by method kafkaProducerFactory
was correctly populated with properties from the Spring Config Server, including the bootstrap.servers
property.
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}
After upgrading to Spring Boot 3.2.6 and Spring Cloud Dependencies 2023.0.2, I noticed that the DefaultKafkaProducerFactory
's bean created by method kafkaProducerFactory
configuration logic has changed. Specifically, the addition of this.applyKafkaConnectionDetailsForProducer(properties, connectionDetails);
is now used, which overwrites the bootstrap.servers
property set by the Spring Config server value to localhost:9092
.
@Bean
@ConditionalOnMissingBean({ProducerFactory.class})
public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(KafkaConnectionDetails connectionDetails, ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers, ObjectProvider<SslBundles> sslBundles) {
Map<String, Object> properties = this.properties.buildProducerProperties((SslBundles)sslBundles.getIfAvailable());
this.applyKafkaConnectionDetailsForProducer(properties, connectionDetails);
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory(properties);
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
customizers.orderedStream().forEach((customizer) -> {
customizer.customize(factory);
});
return factory;
}
The KAFKA_BOOTSTRAP_SERVERS
environment variable is set correctly, and this configuration is supposed to be fetched from the Spring Config Server. I see the Map<String, Object> properties = this.properties.buildProducerProperties((SslBundles)sslBundles.getIfAvailable());
correctly populated the properties with bootstrap.servers -> my.eastus2.azure.confluent.cloud:9092. However the next step overwrites that with localhost:9092
Here is a snippet of my configuration that works in the older versions but fails in the new versions:
# application.yml
spring:
# To overwrite properties loaded from App Config, include below line in application.yml
config:
import: optional:configserver:https://my.config.server.com/v1/config/
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
### Auth ###
sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId='${my_client_id}' \
clientSecret='${my_client_secret}' \
scope='' \
extension_logicalCluster='${my_kafka_cluster_id}' \
extension_identityPoolId='${my_identity_pool}';"
### Schema Registry ###
bearer.auth.client.id: ${my_client_id}
bearer.auth.client.secret: ${my_client_secret}
bearer.auth.identity.pool.id: ${my_identity_pool:pool-xyz}
kekcipherkms.provider.identity.client.id: ${my_client_id}
kekcipherkms.provider.identity.client.secret: ${my_client_secret}
kekcipherkms.provider.identity.scope: my.scope.w
### Producer ###
producer:
properties:
topic: ${topic}
DefaultKafkaProducerFactory
correctly uses the bootstrap.servers
value from the Config Server.bootstrap.servers
value is overwritten to localhost:9092
by the applyKafkaConnectionDetailsForProducer
method.The DefaultKafkaProducerFactory
should use the bootstrap.servers
property from the Spring Config Server as it did in the previous versions.
How can I configure my Spring Boot application to correctly use the bootstrap.servers
value from the Spring Config Server in the newer versions of Spring Boot and Spring Cloud? Is there something I am missing to ensure KafkaConnectionDetails bootstrap.servers
= my Spring Config Servers value and not the default localhost:9092 ?
If anyone has encountered a similar issue or has insights on the changes introduced in the newer versions that might be affecting this behavior, your help would be greatly appreciated.
Found my answer with a little more searching on Stack overflow and a little help from these other posts that were similar:
why is springboot not picking up apache kafka bootstrap-servers in application.yml? answer by Gary Russell
Kafka client connection issue for spring boot 3.1.X and above answer by Artem Bilan
The existing Spring Config Server is providing a spring.kafka.properties.bootstrap.servers
value but since Spring Boot 3.1.0 it needs to be spring.kafka.bootstrap-servers
So my hack of a solution until I can submit a ticket to have the company update the spring config server was to do the following :
# application.yml
spring:
kafka:
bootstrap-servers: ${spring.kafka.properties.bootstrap.servers}