apache-kafkacentos6noip

How to Setup a Public Kafka Broker Using a Dynamic DNS?


I configured a Kafka Cluster with 3 brokers using 3 Zookeepers along with each broker. Figure bellow presents a graphical representation of my cluster.

enter image description here

A producer and consumer test in the same network using the host 192.168.0.10 worked perfectly via kafka-console-producer and kafka-console-consumer commands.

Based on that context, when I try to produce some data via kafka-console-producer.sh --broker-list DYNAMIC_DNS_ADDR:30192,DYNAMIC_DNS_ADDR:30292,DYNAMIC_DNS_ADDR:30392 --topic twitter_tweets through the Internet, I am getting the following error:

[2018-12-10 09:59:20,772] ERROR Error when sending message to topic twitter_tweets with key: null, value: 16 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for twitter_tweets-1: 1505 ms has passed since batch creation plus linger time [2018-12-10 09:59:22,273] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Broker listeners are configured with the following properties:

listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9443
advertised.listeners=PLAINTEXT://192.168.0.241:9092,SSL://192.168.0.241:9443

Obviously, the IP address changed in each broker for the advertised.listeners property. I am using CentOS 6.10 and Kafka 2.0.1 for that setup. A telnet test worked. Another forward to a Kafka REST Proxy port is working via the Internet and listing all topics.


Solution

  • See https://rmoff.net/2018/08/02/kafka-listeners-explained/

    You need two listeners—one responding to and advertising the internal addresses, one for the external one.

    The key thing is that the listener that your client connects to will return the host address and port of that listener.

    At the moment you're spoofing your external one to your internal one, and your external traffic is thus hitting the internal listener.

    You need something like this (varying the IP/hostname of the aws_internal_listener as required per broker):

    KAFKA_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://192.168.0.241:29092
    
    KAFKA_ADVERTISED_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://DYNAMIC_DNS_ADDR:29092
    
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: aws_internal_listener:PLAINTEXT,external_listener:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: aws_internal_listener
    

    Then your port forwarder for DYNAMIC_DNS_ADDR should redirect connections to 29092 on the AWS node. The key thing is that external connections should not end up at the listener port on the host matching the internal listener (which advertises an internal 192.168.0 address)

    Use kafkacat -L -b DYNAMIC_DNS_ADDR:29092 to debug and validate your config, as described in the article here.