spring-cloud-streamspring-cloud-dataflow

Spring Cloud Dataflow kafka-source-kafka throwing error


I am trying to create a steam application using SCDF kafka-source-kafka as a kafka supplier config. Below are the properties to configure the external source kafka as a consumer config.

app.kafka-source-kafka.kafka.supplier.topics=TEST_TOPIC
app.kafka-source-kafka.spring.kafka.consumer.bootstrap-servers=xxxxxxxx
app.kafka-source-kafka.spring.kafka.consumer.enable-auto-commit=true
app.kafka-source-kafka.spring.kafka.consumer.group-id=B2B_EH_PASTHROUGH_FALLBACK
app.kafka-source-kafka.spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxx\" password=\"xxxxxx\";
app.kafka-source-kafka.spring.kafka.consumer.properties.sasl.mechanism=PLAIN
app.kafka-source-kafka.spring.kafka.consumer.properties.security.protocol=SASL_SSL
app.kafka-source-kafka.spring.kafka.consumer.properties.ssl.enabled.protocols=TLSv1.2
app.kafka-source-kafka.spring.kafka.consumer.properties.ssl.truststore.location=/etc/secrets/kafka.client.truststore.jks
app.kafka-source-kafka.spring.kafka.consumer.properties.ssl.truststore.password=xxxx
app.kafka-source-kafka.spring.kafka.consumer.properties.ssl.truststore.type=JKS
app.kafka-source-kafka.spring.kafka.listener.async-acks=true


Logs:
2024-01-04T05:45:27.711Z  INFO 1 --- [main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.include.jmx.reporter = true
    auto.offset.reset = latest
    bootstrap.servers = [xxxxx:9090]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-B2B_EH_PASTHROUGH_FALLBACK-1
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = B2B_EH_PASTHROUGH_FALLBACK

After few minuits----------->

"clients.consumer.ConsumerConfig" values got changed and trying to connect internal kafka brokers.
2024-01-04T05:46:45.754Z  INFO 1 --- [binder-health-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.include.jmx.reporter = true
    auto.offset.reset = latest
    bootstrap.servers = [10.100.172.208:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips

Error: Connection to node -1 (my-release-kafka.springdata.svc.cluster.local/10.100.172.208:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed

Not sure why consumer/supplier bootstrap servers values got changed post successfully joined the external source kafka group.

Successfully logged in. 2024-01-04T05:45:28.226Z INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.4.1 2024-01-04T05:45:28.226Z INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8a516edc2755df89 2024-01-04T05:45:28.226Z INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1704347128225 2024-01-04T05:45:28.318Z INFO 1 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-B2B_EH_PASTHROUGH_FALLBACK-1, groupId=B2B_EH_PASTHROUGH_FALLBACK] Subscribed to topic(s): EIS.TOPIC.PASS.EH.IN.DIT 2024-01-04T05:45:28.409Z INFO 1 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started bean 'kafkaMessageDrivenChannelAdapterSpec'; defined in: 'class path resource [org/springframework/cloud/fn/supplier/kafka/KafkaSupplierConfiguration.class]'; from source: 'org.springframework.cloud.fn.supplier.kafka.KafkaSupplierConfiguration.kafkaMessageDrivenChannelAdapterSpec(org.springframework.cloud.fn.supplier.kafka.KafkaSupplierProperties,org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory,org.springframework.beans.factory.ObjectProvider,org.springframework.beans.factory.ObjectProvider,org.springframework.beans.factory.ObjectProvider,org.springframework.cloud.fn.common.config.ComponentCustomizer)' 2024-01-04T05:45:28.521Z INFO 1 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2024-01-04T05:45:28.715Z INFO 1 --- [ main] .c.s.a.k.s.k.KafkaSourceKafkaApplication : Started KafkaSourceKafkaApplication in 31.902 seconds (process running for 34.939)


Solution

  • The binder-health means the thread for KafkaBinderHealthIndicator. We see those o.a.k.clients.consumer.ConsumerConfig every time when consumerFactory.createConsumer() is called. And there is indeed one for the metadata call in the KafkaBinderHealthIndicator.

    so, it is not changed, rather a new KafkaConsumer instance.

    It is created by the kafkaBinderHealthIndicator bean definition:

        if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    configurationProperties.getKafkaConnectionString());
        }
        ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
        KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(
                kafkaMessageChannelBinder, consumerFactory);
    

    I'm not sure why that brokers config for health indicator is different after all of that logic, but if you have some simple Spring Cloud Stream application to reproduce, feel free to raise a GH issue for Spring Cloud Stream for further investigation on our side.

    Nothing to do with Spring Cloud Dataflow though.