In my project, I need to connect to two different Kafka brokers.
My application.yaml
looks somewhat like this:
spring:
cloud:
function:
definition: orderCreatedListener;orderProcessedListener
stream:
bindings:
orderCreatedProducer-out-0:
destination: order-created
binder: kafka-one
orderCreatedListener-in-0:
destination: order-created
group: spot
binder: kafka-one
orderCreatedListener-out-0:
destination: order-processed
binder: kafka-two
orderProcessedListener-in-0:
destination: order-processed
group: spot
binder: kafka-two
kafka:
binder:
auto-create-topics: true
bindings:
orderCreatedListener-in-0:
consumer:
enableDlq: true
dlqName: order-created-dlq
autoCommitOnError: true
autoCommitOffset: true
orderProcessedListener-in-0:
consumer:
enableDlq: true
dlqName: order-processed-dlq
autoCommitOnError: true
autoCommitOffset: true
binders:
kafka-one:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
kafka-two:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
But it didn't work when I ran the application, this caused the following error:
2024-03-05T23:35:48.473-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 31 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:49.595-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:49.595-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 32 due to node 1001 being disconnected (elapsed time since creation: 5ms, elapsed time since send: 5ms, request timeout: 3600000ms)
2024-03-05T23:35:50.727-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:50.728-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 33 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:51.086-03:00 INFO 25569 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata
I want to separate the Kafka topics in two clusters:
order-created
and order-created-dlq
order-processed
and order-processed-dlq
I using:
I have the two Kafka clusters running fine on my development environment with docker containers, one exposed on the 9092 port and the other exposed on the 9093 port.
How adjust this?
The principal related problem has been caused by incorrect configurations on the docker containers.
Internally, in the docker network, all containers of Kafka were running on the 9092 port and it caused a don't expect behavior!
Old docker-compose.yaml
:
version: "3.9"
services:
spot.zookeeper.one:
image: docker.io/bitnami/zookeeper:3.7
container_name: spot.zookeeper.one
restart: "no"
hostname: spot.zookeeper.one
ports:
- "2282:2181"
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
networks:
- spot-network-one
volumes:
- zookeeper_data_one:/bitnami
spot.broker.one:
image: docker.io/bitnami/kafka:3
container_name: spot.broker.one
hostname: spot.broker.one
restart: "no"
ports:
- "9092:9092"
environment:
KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.one:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.one:2181"
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CFG_PORT: "9092"
KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
ALLOW_PLAINTEXT_LISTENER: "yes"
depends_on:
- spot.zookeeper.one
networks:
- spot-network-one
volumes:
- kafka_data_one:/bitnami
spot.zookeeper.two:
image: docker.io/bitnami/zookeeper:3.7
container_name: spot.zookeeper.two
restart: "no"
hostname: spot.zookeeper.two
ports:
- "2283:2181"
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
networks:
- spot-network-two
volumes:
- zookeeper_data_two:/bitnami
spot.broker.two:
image: docker.io/bitnami/kafka:3
container_name: spot.broker.two
hostname: spot.broker.two
restart: "no"
ports:
- "9093:9092"
environment:
KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.two:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.two:2181"
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CFG_PORT: "9092"
KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
ALLOW_PLAINTEXT_LISTENER: "yes"
depends_on:
- spot.zookeeper.two
networks:
- spot-network-two
volumes:
- kafka_data_two:/bitnami
spot.kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: spot.kafka-ui
restart: "no"
environment:
KAFKA_CLUSTERS_0_NAME: spot-one
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: spot.broker.one:19092
KAFKA_CLUSTERS_0_ZOOKEEPER: spot.zookeeper.one:2181
KAFKA_CLUSTERS_1_NAME: spot-two
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: spot.broker.two:19092
KAFKA_CLUSTERS_1_ZOOKEEPER: spot.zookeeper.two:2181
ports:
- "8580:8080"
depends_on:
- spot.zookeeper.one
- spot.broker.one
- spot.zookeeper.two
- spot.broker.two
networks:
- spot-network-one
- spot-network-two
networks:
spot-network-one:
driver: bridge
spot-network-two:
driver: bridge
volumes:
zookeeper_data_one:
driver: local
kafka_data_one:
driver: local
zookeeper_data_two:
driver: local
kafka_data_two:
driver: local
The adjusted docker-compose.yaml
:
version: "3.9"
services:
spot.zookeeper.one:
image: docker.io/bitnami/zookeeper:3.7
container_name: spot.zookeeper.one
restart: "no"
hostname: spot.zookeeper.one
ports:
- "2282:2181"
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
networks:
- spot-network-one
volumes:
- zookeeper_data_one:/bitnami
spot.broker.one:
image: docker.io/bitnami/kafka:3
container_name: spot.broker.one
hostname: spot.broker.one
restart: "no"
ports:
- "9092:9092"
environment:
KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.one:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.one:2181"
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CFG_PORT: "9092"
KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
ALLOW_PLAINTEXT_LISTENER: "yes"
depends_on:
- spot.zookeeper.one
networks:
- spot-network-one
volumes:
- kafka_data_one:/bitnami
spot.zookeeper.two:
image: docker.io/bitnami/zookeeper:3.7
container_name: spot.zookeeper.two
restart: "no"
hostname: spot.zookeeper.two
ports:
- "2283:2181"
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
networks:
- spot-network-two
volumes:
- zookeeper_data_two:/bitnami
spot.broker.two:
image: docker.io/bitnami/kafka:3
container_name: spot.broker.two
hostname: spot.broker.two
restart: "no"
ports:
- "9093:9093" # modified here
environment:
KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19093,LISTENER_DOCKER_EXTERNAL://:9093" # modified here
KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.two:19093,LISTENER_DOCKER_EXTERNAL://localhost:9093" # modified here
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.two:2181"
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CFG_PORT: "9093" # modified here
KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
ALLOW_PLAINTEXT_LISTENER: "yes"
depends_on:
- spot.zookeeper.two
networks:
- spot-network-two
volumes:
- kafka_data_two:/bitnami
spot.kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: spot.kafka-ui
restart: "no"
environment:
KAFKA_CLUSTERS_0_NAME: spot-one
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: spot.broker.one:19092
KAFKA_CLUSTERS_0_ZOOKEEPER: spot.zookeeper.one:2181
KAFKA_CLUSTERS_1_NAME: spot-two
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: spot.broker.two:19093 # modified here
KAFKA_CLUSTERS_1_ZOOKEEPER: spot.zookeeper.two:2181
ports:
- "8580:8080"
depends_on:
- spot.zookeeper.one
- spot.broker.one
- spot.zookeeper.two
- spot.broker.two
networks:
- spot-network-one
- spot-network-two
networks:
spot-network-one:
driver: bridge
spot-network-two:
driver: bridge
volumes:
zookeeper_data_one:
driver: local
kafka_data_one:
driver: local
zookeeper_data_two:
driver: local
kafka_data_two:
driver: local
All adjusted points have been marked with a # modified here
comment on the line of the file.