node.jskubernetesapache-kafkakafkajs

Kafka connection issue when broker is in different Kubernetes namespace


I have a Node.js microservice connecting to a Kafka service. If the Kafka broker is in the default namespace with the other microservices, there are no issues.

const kafka = new Kafka({
  clientId: 'my-app',
  // brokers: ['kafka-broker.kafka.svc.cluster.local:9092'],
  brokers: ['kafka-broker-srv:9092'],
})

const admin = kafka.admin()
    await admin.connect()
    console.log('admin connected')
    console.log('TOPICS', await admin.listTopics())
[auth-pod] admin connected
[auth-pod] TOPICS [ 'test-topic' ]

Here is the Kubernetes configuration:

# create namespace
apiVersion: v1
kind: Namespace
metadata:
  name: "kafka"
  labels:
    name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper-service
  name: zookeeper-service
  # namespace: kafka
spec:
  # type: NodePort
  ports:
    - name: zookeeper-port
      port: 2181
      # nodePort: 30181
      targetPort: 2181
  selector:
    app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: zookeeper
  name: zookeeper
  # namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
        - image: wurstmeister/zookeeper
          imagePullPolicy: IfNotPresent
          name: zookeeper
          ports:
            - containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
  # labels:
  #   app: kafka-broker
  name: kafka-broker-srv
  # namespace: kafka
spec:
  # headless service
  clusterIP: "None"
  ports:
  - name: foo
    port: 9092
  selector:
    app: kafka-broker
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  # namespace: kafka
spec:
  # replicas: 1
  selector:
    matchLabels:
      app: kafka-broker
  template:
    metadata:
      labels:
        app: kafka-broker
    spec:
      hostname: kafka-broker
      containers:
      - env:
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-service:2181
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://:9092
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://kafka-broker:9092
          # automatically create topics
        - name: KAFKA_CREATE_TOPICS
          value: 'test-topic:1:1'
        - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
          value: 'true'
        - name: KAFKA_DELETE_TOPIC_ENABLE
          value: 'true'
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        name: kafka-broker
        ports:
        - containerPort: 9092

However, if it is in a separate namespace, I can connect to the service 'kafka-broker-srv.kafka.svc.cluster.local:9092' that is in the kafka namespace, but I get a KafkaJSBrokerNotFound error.

[auth-pod] admin connected
[auth-pod] KafkaJSNonRetriableError
[auth-pod]   Caused by: KafkaJSBrokerNotFound: No brokers in the broker pool
[auth-pod]     at BrokerPool.withBroker (/app/node_modules/kafkajs/src/cluster/brokerPool.js:270:13)
[auth-pod]     ... 3 lines matching cause stack trace ...
[auth-pod]     at async test (/app/src/kakfka/connect.js:18:27) {
[auth-pod]   name: 'KafkaJSNumberOfRetriesExceeded',
[auth-pod]   retriable: false,
[auth-pod]   helpUrl: undefined,
[auth-pod]   retryCount: 5,
[auth-pod]   retryTime: 12820,
[auth-pod]   [cause]: KafkaJSBrokerNotFound: No brokers in the broker pool
[auth-pod]       at BrokerPool.withBroker (/app/node_modules/kafkajs/src/cluster/brokerPool.js:270:13)
[auth-pod]       at /app/node_modules/kafkajs/src/cluster/index.js:191:32
[auth-pod]       at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
[auth-pod]       at async Object.listTopics (/app/node_modules/kafkajs/src/admin/index.js:103:31)
[auth-pod]       at async test (/app/src/kakfka/connect.js:18:27) {
[auth-pod]     retriable: true,
[auth-pod]     helpUrl: undefined,
[auth-pod]     [cause]: undefined
[auth-pod]   }
[auth-pod] }

I don't see what the issue could be since it worked before and the only thing that would need to change is to specify the namespace to connect to the service .kafka.svc.cluster.local.

If I use the same name for the service and the broker 'kafka-broker.kafka.svc.cluster.local:9092' (I thought I wasn't supposed to use difference names), I get a connection timeout.

[auth-pod] admin connected
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:11:57.662Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:11:59.139Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:00.351Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:02.154Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:02.958Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo EAI_AGAIN kafka-broker","broker":"kafka-broker:9092","clientId":"my-app","stack":"Error: getaddrinfo EAI_AGAIN kafka-broker\n    at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26)"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:03.230Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:05.359Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:05.461Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo EAI_AGAIN kafka-broker","broker":"kafka-broker:9092","clientId":"my-app","stack":"Error: getaddrinfo EAI_AGAIN kafka-broker\n    at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26)"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:06.533Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:09.139Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo EAI_AGAIN kafka-broker","broker":"kafka-broker:9092","clientId":"my-app","stack":"Error: getaddrinfo EAI_AGAIN kafka-broker\n    at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26)"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:09.640Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:10.646Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:11.536Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo EAI_AGAIN kafka-broker","broker":"kafka-broker:9092","clientId":"my-app","stack":"Error: getaddrinfo EAI_AGAIN kafka-broker\n    at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26)"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:15.262Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo EAI_AGAIN kafka-broker","broker":"kafka-broker:9092","clientId":"my-app","stack":"Error: getaddrinfo EAI_AGAIN kafka-broker\n    at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26)"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:16.151Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:17.160Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] KafkaJSNonRetriableError
[auth-pod]   Caused by: KafkaJSConnectionError: Connection timeout
[auth-pod]     at Timeout.onTimeout [as _onTimeout] (/app/node_modules/kafkajs/src/network/connection.js:223:23)
[auth-pod]     at listOnTimeout (node:internal/timers:569:17)
[auth-pod]     at process.processTimers (node:internal/timers:512:7) {
[auth-pod]   name: 'KafkaJSNumberOfRetriesExceeded',
[auth-pod]   retriable: false,
[auth-pod]   helpUrl: undefined,
[auth-pod]   retryCount: 5,
[auth-pod]   retryTime: 10402,
[auth-pod]   [cause]: KafkaJSConnectionError: Connection timeout
[auth-pod]       at Timeout.onTimeout [as _onTimeout] (/app/node_modules/kafkajs/src/network/connection.js:223:23)
[auth-pod]       at listOnTimeout (node:internal/timers:569:17)
[auth-pod]       at process.processTimers (node:internal/timers:512:7) {
[auth-pod]     retriable: true,
[auth-pod]     helpUrl: undefined,
[auth-pod]     broker: 'kafka-broker:9092',
[auth-pod]     code: undefined,
[auth-pod]     [cause]: undefined
[auth-pod]   }
[auth-pod] }
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:17.548Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo EAI_AGAIN kafka-broker","broker":"kafka-broker:9092","clientId":"my-app","stack":"Error: getaddrinfo EAI_AGAIN kafka-broker\n    at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26)"}

The same thing happens with producers and consumers, the connect call works, but not the subsequent methods.

// producer
const producer = kafka.producer()
await producer.connect()
console.log('producer connected', producer)
send
await producer.send({
  topic: 'test-topic',
  messages: [{ value: 'Hello KafkaJS user!' }],
})
// await producer.disconnect()

// consumer
const consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
console.log('consumer connected', consumer)
// receive
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log('CONSUME', {
      value: message.value.toString(),
    })
  },
})

I am not sure why the naming of the service would make a difference.

Given that I can connect to the service and that the dns resolution is the same within any namespace (so I doubt I would need to change any names in the kafka configuration), what could cause these errors? full code


Interestingly, if I deploy to both namespaces and connect to the service in the kafka namespace, it does work.

  brokers: ['kafka-broker-srv.kafka.svc.cluster.local:9092'],
[auth-pod] admin connected
[auth-pod] TOPICS [ 'test-topic' ]

Is kafkajs connecting to the broker in the default namespace directly; if so, what is the point of going through the kafka service (in the kafka namespace)?

Edit: Since I am using a headless service, the phrasing "going through the kafka service" probably doesn't make sense. The service name is resolved to the ip of the pod directly. If connect works, does that mean the connection to the pod succeeded; if there is a connection error for subsequent method calls, is kafkajs connecting to something other than the pod?


Solution

  • You've set this

    - name: KAFKA_ADVERTISED_LISTENERS
      value: PLAINTEXT://kafka-broker:9092
    

    This means that after a client connects via the bootstrap protocol (when given the FQDN), it sees this address (and not your headless service), not the value you provided in the app code.
    This has no reference to any namespace, therefore will try to lookup tcp://kafka-broker.default.svc.cluster.local:9092 (assuming your app is in the default namespace).

    For more details, refer Connect to Kafka running in Docker

    As mentioned before, you need to advertise the FQDN / address of that specific Kafka pod, so that clients can connect directly to exact brokers, and not to a Service address that is round-robin / load-balanced. For example, Strimzi or Bitnami Helm Charts will advertise values like kafka-0 and kafka-1 for a two-node cluster, while still maintaining some kafka-svc Headless Service to use for bootstrapping.


    if I deploy to both namespaces and connect to the service in the kafka namespace, it does work

    Because that is how DNS lookups are supposed to work.