I'm using kafka-python to produce messages for a Kafka 2.2.1 cluster (a managed cluster instance from AWS's MSK service). I'm able to retrieve the bootstrap servers and establish a network connection to them, but no message ever gets through. Instead after each message of the Type A
I immediately receive one of type B
... and eventually a type C
:
A [INFO] 2019-11-19T15:17:19.603Z <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR] 2019-11-19T15:17:19.605Z <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
What causes a broker node to accept a TCP connection from a hopeful producer, but then immediately close it again?
Edit
The topic already exists, and kafka-topics.sh --list
displays it.
I have the same problem with all clients I've used: Kafka's kafka-console-producer.sh
, kafka-python, confluent-kafka, and kafkacat
The Kafka cluster is in the same VPC as all my other machines, and its security group allows any incoming and outgoing traffic within that VPC.
However, it's managed by Amazon's Managed Streaming for Kafka (MSK) servive, which means I don't have fine-grained control over the server installation settings (or even know what they are). MSK just publishes the zookeeper and message broker URLs for clients to use.
The producer runs as an AWS Lambda function, but the problem persists when I run it on a normal EC2 instance.
Permissions are not the issue. I have assigned the lambda role all the AWS permissions it needs (AWS is always very explicit about which operation required which missing permission).
Connectivity is not the issue. I can reach the URLs of both the zookeepers and the message brokers with standard telnet. However, issuing commands to the zookeepers works, while issuing commands to the message brokers always eventually fails. Since Kafka uses a binary protocol over TCP, I'm at a loss how to debug the problem further.
Edit
As suggested, I debugged this with
./kafkacat -b $BROKERS -L -d broker
and got:
7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1574772202.379|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1574772202.379|BROKERFAIL|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1574772202.380|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
So, is this a kind of mismatch between client and broker API versions? How can I recover from this, bearing in mind that I have no control over the version or the configuration of the Kafka cluster that AWS provides?
I think that this is related to the TLS encryption. By default, MSK spins up a cluster that accepts both PLAINTEXT and TLS but if you are grabbing the bootstrap servers programmatically from the cluster it will only provide you with the TLS ports. If this is the case for you, try using the PLAINTEXT port 9092 instead.
To authenticate the client for TLS you need to generate a certificate: https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html and would then need to get this certificate onto your lambda and reference the certificate in your Producer configuration.
If you are able to configure your MSK cluster as PLAINTEXT only then when you grab the bootstrap servers from the AWS SDK it will give you the PLAINTEXT port and you should be good.