dockerapache-kafkadocker-composekafkajs

How to connect docker-compose kafka from localhost?


I have troubles with connection to kafka from localhost. I'm using kafka with KRaft without zookeper.

services:
  kafka_auth:
    image: confluentinc/cp-kafka:7.2.1
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://kafka_auth:9092,CONTROLLER://kafka_auth:9093,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka_auth:9092,EXTERNAL://127.0.0.1:9094
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka_mqtt:9093,2@kafka_auth:9093,3@kafka_server:9093'
      KAFKA_PROCESS_ROLES: 'broker,controller'
    volumes:
      - ./run_workaround.sh:/tmp/run_workaround.sh
    command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'"
    depends_on:
      - kafka_mqtt
    ports:
      - "9094:9094"

  api:
    image: node:18
    volumes:
      - ./:/app
    command: sh -c "yarn install && yarn start:debug"
    depends_on:
      - kafka_mqtt

When I connect from api service it's connect without any errors, but when I try to connect outside of docker using same code on port 9094 I get an error:

{"level":"ERROR","timestamp":"2023-08-06T01:16:06.815Z","logger":"kafkajs","message":"[BrokerPool] Closed connection","retryCount":27,"retryTime":10000}

run_workaround.sh

sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure
echo "kafka-storage format --ignore-formatted -t CLUSTER_ID -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure

nodejs

import {
  Kafka,
  Consumer,
} from 'kafkajs';
const kafka = new Kafka({
    clientId: 'clientId',
    brokers: process.env.KAFKA_BOOTSTRAP_SERVERS.split(','),
    requestTimeout: 3600000,
    retry: {
      maxRetryTime: 10000,
      initialRetryTime: 10000,
      retries: 999999999,
    },
});

const consumer = kafka.consumer({ groupId: 'groupId' });
consumer.connect().then(async () => {
  // here I also check all topics and create them if they don't exist
  await consumer.subscribe({ topics: ['topic1'] });
  await consumer.run({
    eachMessage: async ({ topic, message }) => {
      if (!message?.value) {
        return;
      }
      switch (topic) {
        case 'topic1':
          method(message.value.toString());
          break;
        default:
          break;
      }
    },
  });

Solution

  • I fixed it by adding all Kafka services to the same network and an extra host host.docker.internal also I had an error in environment variables.

    services:
      kafka_auth:
        image: confluentinc/cp-kafka:7.2.1
        container_name: kafka_auth
        hostname: kafka_auth
        restart: always
        environment:
          # ...
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,LISTENER_LOCALHOST:PLAINTEXT,LISTENER_DOCKER:PLAINTEXT'
          KAFKA_ADVERTISED_LISTENERS: 'LISTENER_LOCALHOST://kafka_auth:9192,LISTENER_DOCKER://kafka_auth:9194'
          KAFKA_PROCESS_ROLES: 'broker,controller'
          KAFKA_LISTENERS: 'CONTROLLER://kafka_auth:9093,LISTENER_LOCALHOST://kafka_auth:9192,LISTENER_DOCKER://kafka_auth:9194'
          KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
          KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_LOCALHOST'
        networks:
          - kafka_net
        extra_hosts:
          - "host.docker.internal:host-gateway"
     networks:
      kafka_net:
        name: kafka_net