I am trying to create a steam application using SCDF kafka-source-kafka as a kafka supplier config. Below are the properties to configure the external source kafka as a consumer config.
app.kafka-source-kafka.kafka.supplier.topics=TEST_TOPIC
app.kafka-source-kafka.spring.kafka.consumer.bootstrap-servers=xxxxxxxx
app.kafka-source-kafka.spring.kafka.consumer.enable-auto-commit=true
app.kafka-source-kafka.spring.kafka.consumer.group-id=B2B_EH_PASTHROUGH_FALLBACK
app.kafka-source-kafka.spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxx\" password=\"xxxxxx\";
app.kafka-source-kafka.spring.kafka.consumer.properties.sasl.mechanism=PLAIN
app.kafka-source-kafka.spring.kafka.consumer.properties.security.protocol=SASL_SSL
app.kafka-source-kafka.spring.kafka.consumer.properties.ssl.enabled.protocols=TLSv1.2
app.kafka-source-kafka.spring.kafka.consumer.properties.ssl.truststore.location=/etc/secrets/kafka.client.truststore.jks
app.kafka-source-kafka.spring.kafka.consumer.properties.ssl.truststore.password=xxxx
app.kafka-source-kafka.spring.kafka.consumer.properties.ssl.truststore.type=JKS
app.kafka-source-kafka.spring.kafka.listener.async-acks=true
Logs:
2024-01-04T05:45:27.711Z INFO 1 --- [main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = latest
bootstrap.servers = [xxxxx:9090]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-B2B_EH_PASTHROUGH_FALLBACK-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = B2B_EH_PASTHROUGH_FALLBACK
After few minuits----------->
"clients.consumer.ConsumerConfig" values got changed and trying to connect internal kafka brokers.
2024-01-04T05:46:45.754Z INFO 1 --- [binder-health-1] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = latest
bootstrap.servers = [10.100.172.208:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
Error: Connection to node -1 (my-release-kafka.springdata.svc.cluster.local/10.100.172.208:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed
Not sure why consumer/supplier bootstrap servers values got changed post successfully joined the external source kafka group.
Successfully logged in. 2024-01-04T05:45:28.226Z INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.4.1 2024-01-04T05:45:28.226Z INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8a516edc2755df89 2024-01-04T05:45:28.226Z INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1704347128225 2024-01-04T05:45:28.318Z INFO 1 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-B2B_EH_PASTHROUGH_FALLBACK-1, groupId=B2B_EH_PASTHROUGH_FALLBACK] Subscribed to topic(s): EIS.TOPIC.PASS.EH.IN.DIT 2024-01-04T05:45:28.409Z INFO 1 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started bean 'kafkaMessageDrivenChannelAdapterSpec'; defined in: 'class path resource [org/springframework/cloud/fn/supplier/kafka/KafkaSupplierConfiguration.class]'; from source: 'org.springframework.cloud.fn.supplier.kafka.KafkaSupplierConfiguration.kafkaMessageDrivenChannelAdapterSpec(org.springframework.cloud.fn.supplier.kafka.KafkaSupplierProperties,org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory,org.springframework.beans.factory.ObjectProvider,org.springframework.beans.factory.ObjectProvider,org.springframework.beans.factory.ObjectProvider,org.springframework.cloud.fn.common.config.ComponentCustomizer)' 2024-01-04T05:45:28.521Z INFO 1 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2024-01-04T05:45:28.715Z INFO 1 --- [ main] .c.s.a.k.s.k.KafkaSourceKafkaApplication : Started KafkaSourceKafkaApplication in 31.902 seconds (process running for 34.939)
The binder-health
means the thread for KafkaBinderHealthIndicator
. We see those o.a.k.clients.consumer.ConsumerConfig
every time when consumerFactory.createConsumer()
is called. And there is indeed one for the metadata call in the KafkaBinderHealthIndicator
.
so, it is not changed, rather a new KafkaConsumer
instance.
It is created by the kafkaBinderHealthIndicator
bean definition:
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(
kafkaMessageChannelBinder, consumerFactory);
I'm not sure why that brokers config for health indicator is different after all of that logic, but if you have some simple Spring Cloud Stream application to reproduce, feel free to raise a GH issue for Spring Cloud Stream for further investigation on our side.
Nothing to do with Spring Cloud Dataflow though.