I have a Node.js microservice connecting to a Kafka service. If the Kafka broker is in the default namespace with the other microservices, there are no issues.
const kafka = new Kafka({
clientId: 'my-app',
// brokers: ['kafka-broker.kafka.svc.cluster.local:9092'],
brokers: ['kafka-broker-srv:9092'],
})
const admin = kafka.admin()
await admin.connect()
console.log('admin connected')
console.log('TOPICS', await admin.listTopics())
[auth-pod] admin connected
[auth-pod] TOPICS [ 'test-topic' ]
Here is the Kubernetes configuration:
# create namespace
apiVersion: v1
kind: Namespace
metadata:
name: "kafka"
labels:
name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
labels:
app: zookeeper-service
name: zookeeper-service
# namespace: kafka
spec:
# type: NodePort
ports:
- name: zookeeper-port
port: 2181
# nodePort: 30181
targetPort: 2181
selector:
app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: zookeeper
name: zookeeper
# namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- image: wurstmeister/zookeeper
imagePullPolicy: IfNotPresent
name: zookeeper
ports:
- containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
# labels:
# app: kafka-broker
name: kafka-broker-srv
# namespace: kafka
spec:
# headless service
clusterIP: "None"
ports:
- name: foo
port: 9092
selector:
app: kafka-broker
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
app: kafka-broker
name: kafka-broker
# namespace: kafka
spec:
# replicas: 1
selector:
matchLabels:
app: kafka-broker
template:
metadata:
labels:
app: kafka-broker
spec:
hostname: kafka-broker
containers:
- env:
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-service:2181
- name: KAFKA_LISTENERS
value: PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka-broker:9092
# automatically create topics
- name: KAFKA_CREATE_TOPICS
value: 'test-topic:1:1'
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: 'true'
- name: KAFKA_DELETE_TOPIC_ENABLE
value: 'true'
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
name: kafka-broker
ports:
- containerPort: 9092
However, if it is in a separate namespace, I can connect to the service 'kafka-broker-srv.kafka.svc.cluster.local:9092'
that is in the kafka namespace, but I get a KafkaJSBrokerNotFound
error.
[auth-pod] admin connected
[auth-pod] KafkaJSNonRetriableError
[auth-pod] Caused by: KafkaJSBrokerNotFound: No brokers in the broker pool
[auth-pod] at BrokerPool.withBroker (/app/node_modules/kafkajs/src/cluster/brokerPool.js:270:13)
[auth-pod] ... 3 lines matching cause stack trace ...
[auth-pod] at async test (/app/src/kakfka/connect.js:18:27) {
[auth-pod] name: 'KafkaJSNumberOfRetriesExceeded',
[auth-pod] retriable: false,
[auth-pod] helpUrl: undefined,
[auth-pod] retryCount: 5,
[auth-pod] retryTime: 12820,
[auth-pod] [cause]: KafkaJSBrokerNotFound: No brokers in the broker pool
[auth-pod] at BrokerPool.withBroker (/app/node_modules/kafkajs/src/cluster/brokerPool.js:270:13)
[auth-pod] at /app/node_modules/kafkajs/src/cluster/index.js:191:32
[auth-pod] at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
[auth-pod] at async Object.listTopics (/app/node_modules/kafkajs/src/admin/index.js:103:31)
[auth-pod] at async test (/app/src/kakfka/connect.js:18:27) {
[auth-pod] retriable: true,
[auth-pod] helpUrl: undefined,
[auth-pod] [cause]: undefined
[auth-pod] }
[auth-pod] }
I don't see what the issue could be since it worked before and the only thing that would need to change is to specify the namespace to connect to the service .kafka.svc.cluster.local
.
If I use the same name for the service and the broker 'kafka-broker.kafka.svc.cluster.local:9092'
(I thought I wasn't supposed to use difference names), I get a connection timeout.
[auth-pod] admin connected
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:11:57.662Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:11:59.139Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:00.351Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:02.154Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:02.958Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo EAI_AGAIN kafka-broker","broker":"kafka-broker:9092","clientId":"my-app","stack":"Error: getaddrinfo EAI_AGAIN kafka-broker\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26)"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:03.230Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:05.359Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:05.461Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo EAI_AGAIN kafka-broker","broker":"kafka-broker:9092","clientId":"my-app","stack":"Error: getaddrinfo EAI_AGAIN kafka-broker\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26)"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:06.533Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:09.139Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo EAI_AGAIN kafka-broker","broker":"kafka-broker:9092","clientId":"my-app","stack":"Error: getaddrinfo EAI_AGAIN kafka-broker\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26)"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:09.640Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:10.646Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:11.536Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo EAI_AGAIN kafka-broker","broker":"kafka-broker:9092","clientId":"my-app","stack":"Error: getaddrinfo EAI_AGAIN kafka-broker\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26)"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:15.262Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo EAI_AGAIN kafka-broker","broker":"kafka-broker:9092","clientId":"my-app","stack":"Error: getaddrinfo EAI_AGAIN kafka-broker\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26)"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:16.151Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:17.160Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka-broker:9092","clientId":"my-app"}
[auth-pod] KafkaJSNonRetriableError
[auth-pod] Caused by: KafkaJSConnectionError: Connection timeout
[auth-pod] at Timeout.onTimeout [as _onTimeout] (/app/node_modules/kafkajs/src/network/connection.js:223:23)
[auth-pod] at listOnTimeout (node:internal/timers:569:17)
[auth-pod] at process.processTimers (node:internal/timers:512:7) {
[auth-pod] name: 'KafkaJSNumberOfRetriesExceeded',
[auth-pod] retriable: false,
[auth-pod] helpUrl: undefined,
[auth-pod] retryCount: 5,
[auth-pod] retryTime: 10402,
[auth-pod] [cause]: KafkaJSConnectionError: Connection timeout
[auth-pod] at Timeout.onTimeout [as _onTimeout] (/app/node_modules/kafkajs/src/network/connection.js:223:23)
[auth-pod] at listOnTimeout (node:internal/timers:569:17)
[auth-pod] at process.processTimers (node:internal/timers:512:7) {
[auth-pod] retriable: true,
[auth-pod] helpUrl: undefined,
[auth-pod] broker: 'kafka-broker:9092',
[auth-pod] code: undefined,
[auth-pod] [cause]: undefined
[auth-pod] }
[auth-pod] }
[auth-pod] {"level":"ERROR","timestamp":"2023-03-28T04:12:17.548Z","logger":"kafkajs","message":"[Connection] Connection error: getaddrinfo EAI_AGAIN kafka-broker","broker":"kafka-broker:9092","clientId":"my-app","stack":"Error: getaddrinfo EAI_AGAIN kafka-broker\n at GetAddrInfoReqWrap.onlookup [as oncomplete] (node:dns:107:26)"}
The same thing happens with producers and consumers, the connect
call works, but not the subsequent methods.
// producer
const producer = kafka.producer()
await producer.connect()
console.log('producer connected', producer)
send
await producer.send({
topic: 'test-topic',
messages: [{ value: 'Hello KafkaJS user!' }],
})
// await producer.disconnect()
// consumer
const consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
console.log('consumer connected', consumer)
// receive
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log('CONSUME', {
value: message.value.toString(),
})
},
})
I am not sure why the naming of the service would make a difference.
Given that I can connect to the service and that the dns resolution is the same within any namespace (so I doubt I would need to change any names in the kafka configuration), what could cause these errors? full code
Interestingly, if I deploy to both namespaces and connect to the service in the kafka namespace, it does work.
brokers: ['kafka-broker-srv.kafka.svc.cluster.local:9092'],
[auth-pod] admin connected
[auth-pod] TOPICS [ 'test-topic' ]
Is kafkajs connecting to the broker in the default namespace directly; if so, what is the point of going through the kafka service (in the kafka namespace)?
Edit:
Since I am using a headless service, the phrasing "going through the kafka service" probably doesn't make sense. The service name is resolved to the ip of the pod directly. If connect
works, does that mean the connection to the pod succeeded; if there is a connection error for subsequent method calls, is kafkajs connecting to something other than the pod?
You've set this
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka-broker:9092
This means that after a client connects via the bootstrap protocol (when given the FQDN), it sees this address (and not your headless service), not the value you provided in the app code.
This has no reference to any namespace, therefore will try to lookup tcp://kafka-broker.default.svc.cluster.local:9092
(assuming your app is in the default
namespace).
For more details, refer Connect to Kafka running in Docker
As mentioned before, you need to advertise the FQDN / address of that specific Kafka pod, so that clients can connect directly to exact brokers, and not to a Service address that is round-robin / load-balanced. For example, Strimzi or Bitnami Helm Charts will advertise values like kafka-0
and kafka-1
for a two-node cluster, while still maintaining some kafka-svc
Headless Service to use for bootstrapping.
if I deploy to both namespaces and connect to the service in the kafka namespace, it does work
Because that is how DNS lookups are supposed to work.