kubernetesapache-kafka

Error connecting to kafka broker as a k8s pod on local machine


I am creating kafka pods for my Apache Kafka broker with the image from confluentinc/cp-kafka:7.3.0 in my k8s cluster locally (Docker Desktop Kubernetes). I encountered the error message when creating the pods for my kafka broker when I run kafka logs <pod_name>:

[2024-09-17 12:44:09,830] INFO [Controller id=1, targetBrokerId=1] Node 1 disconnected. (org.apache.kafka.clients.NetworkClient)
[2024-09-17 12:44:09,830] WARN [Controller id=1, targetBrokerId=1] Connection to node 1 (kafka-broker/10.105.234.216:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2024-09-17 12:44:09,830] WARN [RequestSendThread controllerId=1] Controller 1's connection to broker kafka-broker:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to kafka-broker:9092 (id: 1 rack: null) failed.
    at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:292)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:246)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2024-09-17 12:44:09,830] INFO [Controller id=1, targetBrokerId=1] Client requested connection close from node 1 (org.apache.kafka.clients.NetworkClient)

These are my k8s config files. The one that is failing is kafka-broker not kafka-zookeeper

kafka-broker.yml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-broker
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-broker
  template:
    metadata:
      labels:
        app: kafka-broker
    spec:
      containers:
        - name: kafka-broker
          image: confluentinc/cp-kafka:7.3.0
          env:
            - name: KAFKA_BROKER_ID
              value: "1"
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: "zookeeper:2181"
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
            - name: KAFKA_ADVERTISED_LISTENERS
              value: PLAINTEXT://kafka-broker:9092,PLAINTEXT_INTERNAL://kafka-broker:29092
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
              value: "1"
          readinessProbe:
            exec:
              command:
                - kafka-topics
                - --bootstrap-server
                - kafka-broker:9092
                - --list
            initialDelaySeconds: 60
            periodSeconds: 30
            timeoutSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-broker
spec:
  selector:
    app: kafka-broker
  ports:
    - name: kafka
      port: 9092
  type: ClusterIP

kafka-zookeeper.yml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: zookeeper
spec:
  replicas: 1
  selector:
    matchLabels:
      ims: zookeeper
  template:
    metadata:
      labels:
        ims: zookeeper
    spec:
      containers:
        - name: zookeeper
          image: confluentinc/cp-zookeeper:7.3.0
          ports:
            - containerPort: 2181
          env:
            - name: ZOOKEEPER_CLIENT_PORT
              value: "2181"
            - name: ZOOKEEPER_TICK_TIME
              value: "2000"
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper
spec:
  ports:
    - port: 2181
  selector:
    ims: zookeeper

My Pods:

NAME                            READY   STATUS    RESTARTS   AGE
kafka-broker-676b889556-6xqc7   0/1     Running   0          5m19s
zookeeper-644f45dfc4-4ln6c      1/1     Running   0          5m19s

My docker-compose.yml works well when creating the containers:

## docker-compose.yml
version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    healthcheck:
      test:
        ["CMD", "kafka-topics", "--bootstrap-server", "broker:9092", "--list"]
      interval: 30s
      timeout: 10s
      retries: 10
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

I just ran kubectl apply -f ./k8s/kafka-broker.yml -f ./k8s/kafka-zookeeper.yml and it fails to create the pod kafka-broker. Is anyone facing the same issue?


Solution

  • Your readiness probe is failing, which is causing the Pod to never enter "ready" status; it will eventually be killed and restarted.

    A couple of other setups suggest either just checking to see if there is a listener on the Kafka TCP port

    readinessProbe:
      tcpSocket:
        port: 9092
    

    or using the kafka-broker-api-versions script, which won't necessarily attempt to connect to the entire cluster

    readinessProbe:
      exec:
        command:
          - kafka-broker-api-versions
          - --bootstrap-server
          - localhost:9092  # <-- note `localhost` here
    

    The setup as you have it now has a chicken-and-egg problem. The Service kafka-broker only connects to the Pods that are ready. In your existing readiness probe, you're trying to connect to bootstrap-server:9092, but since this Pod isn't ready yet, it's trying to connect to one of the other ready Pods instead; since there aren't any, the probe fails, which leaves you in the position you're in.

    Using localhost in probes is common, since it will refer to "the current container", which is what you want here.

    For Kafka more specifically, connections often run in two phases. A client connects to the named bootstrap server(s), which are able to provide a list of all brokers in the cluster, and then the actual request delegates to one of those other nodes. That's complicated for a readiness probe, and where I suspect kafka-broker-api-versions will only talk to the local broker where kafka-topics might try to reach out to the entire broker cluster.