I have troubles with connection to kafka from localhost. I'm using kafka with KRaft without zookeper.
services:
kafka_auth:
image: confluentinc/cp-kafka:7.2.1
environment:
KAFKA_NODE_ID: 2
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://kafka_auth:9092,CONTROLLER://kafka_auth:9093,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka_auth:9092,EXTERNAL://127.0.0.1:9094
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka_mqtt:9093,2@kafka_auth:9093,3@kafka_server:9093'
KAFKA_PROCESS_ROLES: 'broker,controller'
volumes:
- ./run_workaround.sh:/tmp/run_workaround.sh
command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'"
depends_on:
- kafka_mqtt
ports:
- "9094:9094"
api:
image: node:18
volumes:
- ./:/app
command: sh -c "yarn install && yarn start:debug"
depends_on:
- kafka_mqtt
When I connect from api service it's connect without any errors, but when I try to connect outside of docker using same code on port 9094 I get an error:
{"level":"ERROR","timestamp":"2023-08-06T01:16:06.815Z","logger":"kafkajs","message":"[BrokerPool] Closed connection","retryCount":27,"retryTime":10000}
run_workaround.sh
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure
echo "kafka-storage format --ignore-formatted -t CLUSTER_ID -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure
nodejs
import {
Kafka,
Consumer,
} from 'kafkajs';
const kafka = new Kafka({
clientId: 'clientId',
brokers: process.env.KAFKA_BOOTSTRAP_SERVERS.split(','),
requestTimeout: 3600000,
retry: {
maxRetryTime: 10000,
initialRetryTime: 10000,
retries: 999999999,
},
});
const consumer = kafka.consumer({ groupId: 'groupId' });
consumer.connect().then(async () => {
// here I also check all topics and create them if they don't exist
await consumer.subscribe({ topics: ['topic1'] });
await consumer.run({
eachMessage: async ({ topic, message }) => {
if (!message?.value) {
return;
}
switch (topic) {
case 'topic1':
method(message.value.toString());
break;
default:
break;
}
},
});
I fixed it by adding all Kafka services to the same network and an extra host host.docker.internal also I had an error in environment variables.
services:
kafka_auth:
image: confluentinc/cp-kafka:7.2.1
container_name: kafka_auth
hostname: kafka_auth
restart: always
environment:
# ...
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,LISTENER_LOCALHOST:PLAINTEXT,LISTENER_DOCKER:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'LISTENER_LOCALHOST://kafka_auth:9192,LISTENER_DOCKER://kafka_auth:9194'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_LISTENERS: 'CONTROLLER://kafka_auth:9093,LISTENER_LOCALHOST://kafka_auth:9192,LISTENER_DOCKER://kafka_auth:9194'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_LOCALHOST'
networks:
- kafka_net
extra_hosts:
- "host.docker.internal:host-gateway"
networks:
kafka_net:
name: kafka_net