springspring-cloudspring-cloud-streamspring-cloud-stream-binder-kafkaspring-cloud-stream-binder

Spring Cloud Stream Multiple Kafka Clusters Configuration


In my project, I need to connect to two different Kafka brokers.

My application.yaml looks somewhat like this:

spring:
  cloud:
    function:
      definition: orderCreatedListener;orderProcessedListener
    stream:
      bindings:
        orderCreatedProducer-out-0:
          destination: order-created
          binder: kafka-one
        orderCreatedListener-in-0:
          destination: order-created
          group: spot
          binder: kafka-one
        orderCreatedListener-out-0:
          destination: order-processed
          binder: kafka-two
        orderProcessedListener-in-0:
          destination: order-processed
          group: spot
          binder: kafka-two
      kafka:
        binder:
          auto-create-topics: true
        bindings:
          orderCreatedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-created-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
      binders:
        kafka-one:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
        kafka-two:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093

But it didn't work when I ran the application, this caused the following error:

2024-03-05T23:35:48.473-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 31 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:49.595-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:49.595-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 32 due to node 1001 being disconnected (elapsed time since creation: 5ms, elapsed time since send: 5ms, request timeout: 3600000ms)
2024-03-05T23:35:50.727-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:50.728-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 33 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:51.086-03:00  INFO 25569 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=adminclient-1] Metadata update failed

org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata

I want to separate the Kafka topics in two clusters:

I using:

I have the two Kafka clusters running fine on my development environment with docker containers, one exposed on the 9092 port and the other exposed on the 9093 port.

How adjust this?


Solution

  • The principal related problem has been caused by incorrect configurations on the docker containers.

    Internally, in the docker network, all containers of Kafka were running on the 9092 port and it caused a don't expect behavior!

    Old docker-compose.yaml:

    version: "3.9"
    
    services:
    
      spot.zookeeper.one:
        image: docker.io/bitnami/zookeeper:3.7
        container_name: spot.zookeeper.one
        restart: "no"
        hostname: spot.zookeeper.one
        ports:
          - "2282:2181"
        environment:
          ALLOW_ANONYMOUS_LOGIN: "yes"
        networks:
          - spot-network-one
        volumes:
          - zookeeper_data_one:/bitnami
    
      spot.broker.one:
        image: docker.io/bitnami/kafka:3
        container_name: spot.broker.one
        hostname: spot.broker.one
        restart: "no"
        ports:
          - "9092:9092"
        environment:
          KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
          KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.one:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
          KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
          KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.one:2181"
          KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
          KAFKA_CFG_PORT: "9092"
          KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
          ALLOW_PLAINTEXT_LISTENER: "yes"
        depends_on:
          - spot.zookeeper.one
        networks:
          - spot-network-one
        volumes:
          - kafka_data_one:/bitnami
    
      spot.zookeeper.two:
        image: docker.io/bitnami/zookeeper:3.7
        container_name: spot.zookeeper.two
        restart: "no"
        hostname: spot.zookeeper.two
        ports:
          - "2283:2181"
        environment:
          ALLOW_ANONYMOUS_LOGIN: "yes"
        networks:
          - spot-network-two
        volumes:
          - zookeeper_data_two:/bitnami
    
      spot.broker.two:
        image: docker.io/bitnami/kafka:3
        container_name: spot.broker.two
        hostname: spot.broker.two
        restart: "no"
        ports:
          - "9093:9092"
        environment:
          KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
          KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.two:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
          KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
          KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.two:2181"
          KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
          KAFKA_CFG_PORT: "9092"
          KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
          ALLOW_PLAINTEXT_LISTENER: "yes"
        depends_on:
          - spot.zookeeper.two
        networks:
          - spot-network-two
        volumes:
          - kafka_data_two:/bitnami
    
      spot.kafka-ui:
        image: provectuslabs/kafka-ui:latest
        container_name: spot.kafka-ui
        restart: "no"
        environment:
          KAFKA_CLUSTERS_0_NAME: spot-one
          KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: spot.broker.one:19092
          KAFKA_CLUSTERS_0_ZOOKEEPER: spot.zookeeper.one:2181
          KAFKA_CLUSTERS_1_NAME: spot-two
          KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: spot.broker.two:19092
          KAFKA_CLUSTERS_1_ZOOKEEPER: spot.zookeeper.two:2181
        ports:
          - "8580:8080"
        depends_on:
          - spot.zookeeper.one
          - spot.broker.one
          - spot.zookeeper.two
          - spot.broker.two
        networks:
          - spot-network-one
          - spot-network-two
    
    networks:
      spot-network-one:
        driver: bridge
      spot-network-two:
        driver: bridge
    
    volumes:
      zookeeper_data_one:
        driver: local
      kafka_data_one:
        driver: local
      zookeeper_data_two:
        driver: local
      kafka_data_two:
        driver: local
    

    The adjusted docker-compose.yaml:

    version: "3.9"
    
    services:
    
      spot.zookeeper.one:
        image: docker.io/bitnami/zookeeper:3.7
        container_name: spot.zookeeper.one
        restart: "no"
        hostname: spot.zookeeper.one
        ports:
          - "2282:2181"
        environment:
          ALLOW_ANONYMOUS_LOGIN: "yes"
        networks:
          - spot-network-one
        volumes:
          - zookeeper_data_one:/bitnami
    
      spot.broker.one:
        image: docker.io/bitnami/kafka:3
        container_name: spot.broker.one
        hostname: spot.broker.one
        restart: "no"
        ports:
          - "9092:9092"
        environment:
          KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
          KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.one:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
          KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
          KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.one:2181"
          KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
          KAFKA_CFG_PORT: "9092"
          KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
          ALLOW_PLAINTEXT_LISTENER: "yes"
        depends_on:
          - spot.zookeeper.one
        networks:
          - spot-network-one
        volumes:
          - kafka_data_one:/bitnami
    
      spot.zookeeper.two:
        image: docker.io/bitnami/zookeeper:3.7
        container_name: spot.zookeeper.two
        restart: "no"
        hostname: spot.zookeeper.two
        ports:
          - "2283:2181"
        environment:
          ALLOW_ANONYMOUS_LOGIN: "yes"
        networks:
          - spot-network-two
        volumes:
          - zookeeper_data_two:/bitnami
    
      spot.broker.two:
        image: docker.io/bitnami/kafka:3
        container_name: spot.broker.two
        hostname: spot.broker.two
        restart: "no"
        ports:
          - "9093:9093" # modified here
        environment:
          KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19093,LISTENER_DOCKER_EXTERNAL://:9093" # modified here
          KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.two:19093,LISTENER_DOCKER_EXTERNAL://localhost:9093" # modified here
          KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
          KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.two:2181"
          KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
          KAFKA_CFG_PORT: "9093" # modified here
          KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
          ALLOW_PLAINTEXT_LISTENER: "yes"
        depends_on:
          - spot.zookeeper.two
        networks:
          - spot-network-two
        volumes:
          - kafka_data_two:/bitnami
    
      spot.kafka-ui:
        image: provectuslabs/kafka-ui:latest
        container_name: spot.kafka-ui
        restart: "no"
        environment:
          KAFKA_CLUSTERS_0_NAME: spot-one
          KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: spot.broker.one:19092
          KAFKA_CLUSTERS_0_ZOOKEEPER: spot.zookeeper.one:2181
          KAFKA_CLUSTERS_1_NAME: spot-two
          KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: spot.broker.two:19093 # modified here
          KAFKA_CLUSTERS_1_ZOOKEEPER: spot.zookeeper.two:2181
        ports:
          - "8580:8080"
        depends_on:
          - spot.zookeeper.one
          - spot.broker.one
          - spot.zookeeper.two
          - spot.broker.two
        networks:
          - spot-network-one
          - spot-network-two
    
    networks:
      spot-network-one:
        driver: bridge
      spot-network-two:
        driver: bridge
    
    volumes:
      zookeeper_data_one:
        driver: local
      kafka_data_one:
        driver: local
      zookeeper_data_two:
        driver: local
      kafka_data_two:
        driver: local
    

    All adjusted points have been marked with a # modified here comment on the line of the file.