I configured a Kafka Cluster with 3 brokers using 3 Zookeepers along with each broker. Figure bellow presents a graphical representation of my cluster.
A producer and consumer test in the same network using the host 192.168.0.10
worked perfectly via kafka-console-producer
and kafka-console-consumer
commands.
Based on that context, when I try to produce some data via kafka-console-producer.sh --broker-list DYNAMIC_DNS_ADDR:30192,DYNAMIC_DNS_ADDR:30292,DYNAMIC_DNS_ADDR:30392 --topic twitter_tweets
through the Internet, I am getting the following error:
[2018-12-10 09:59:20,772] ERROR Error when sending message to topic twitter_tweets with key: null, value: 16 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for twitter_tweets-1: 1505 ms has passed since batch creation plus linger time [2018-12-10 09:59:22,273] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Broker listeners are configured with the following properties:
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9443
advertised.listeners=PLAINTEXT://192.168.0.241:9092,SSL://192.168.0.241:9443
Obviously, the IP address changed in each broker for the advertised.listeners
property. I am using CentOS 6.10
and Kafka 2.0.1
for that setup. A telnet test worked. Another forward to a Kafka REST Proxy port is working via the Internet and listing all topics.
See https://rmoff.net/2018/08/02/kafka-listeners-explained/
You need two listeners—one responding to and advertising the internal addresses, one for the external one.
The key thing is that the listener that your client connects to will return the host address and port of that listener.
At the moment you're spoofing your external one to your internal one, and your external traffic is thus hitting the internal listener.
You need something like this (varying the IP/hostname of the aws_internal_listener
as required per broker):
KAFKA_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://192.168.0.241:29092
KAFKA_ADVERTISED_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://DYNAMIC_DNS_ADDR:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: aws_internal_listener:PLAINTEXT,external_listener:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: aws_internal_listener
Then your port forwarder for DYNAMIC_DNS_ADDR
should redirect connections to 29092 on the AWS node. The key thing is that external connections should not end up at the listener port on the host matching the internal listener (which advertises an internal 192.168.0
address)
Use kafkacat -L -b DYNAMIC_DNS_ADDR:29092
to debug and validate your config, as described in the article here.