I'm trying to connect an AWS Lambda function to an Amazon MSK (Managed Streaming for Apache Kafka) cluster using the kafka-python library with SASL_SSL authentication. I am following the official documentation link. However, I'm encountering a connection error related to the authentication process.
Here's the code I'm using:
import os
import socket
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
KAFKA_BROKERS = [
"b-1.xxx:9098",
"b-2.xxx:9098",
"b-3.xxx:9098"
]
class MSKTokenProvider():
def token(self):
token, _ = MSKAuthTokenProvider.generate_auth_token('us-west-2')
return token
tp = MSKTokenProvider()
print("Generated Token:", tp.token())
print("Client:", socket.gethostname())
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKERS,
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=tp,
client_id=socket.gethostname(),
api_version=(3, 2, 0)
)
I'm getting the following error when I try to run the above code:
[ERROR] 2024-10-30T23:54:49.698Z e7d86aa6-cd0c-40c4-a374-42b25946d291 <BrokerConnection node_id=bootstrap-2 host=b-3.xxx:9098 <authenticating> [IPv4 ('10.29.34.48', 9098)]>: Error receiving reply from server
Traceback (most recent call last):
File "/opt/python/kafka/conn.py", line 803, in _try_authenticate_oauth
data = self._recv_bytes_blocking(4)
File "/opt/python/kafka/conn.py", line 616, in _recv_bytes_blocking
raise ConnectionError('Connection reset during recv')
ConnectionError: Connection reset during recv
the apache kafka version of the msk cluster is 3.2.0
lambda runtime version and layer version is Python 3.9
What might be causing the connection reset during the authentication process?
Are there additional configurations or troubleshooting steps I should consider that are not mentioned in the documentation link?
i am attaching the security details of the msk cluster, the network settings screenshots and the lambda vpc details
Here are the things I would check, in order:
Follow the links in this screenshot - make sure that the permissions attached to your lambda match up to those in the policy example.
You didn't show us your inbound security group settings. It is likely that your security group settings do not allow inbound to the cluster on the ports that IAM uses. The ports to use will depend on your config. It looks like your lambda is in the same account and vpc as the cluster, in which case you want an inbound security group rule that allows Custom TCP Traffic access on 9098. If you had public access enabled, then it would have been port 9198.
If the lambda's are in a different account, and you're using the multi-VPC private connectivity setting, in which case see this. Basically, you want to make sure you have a cluster iam policy attached to the cluster and a security group rule that allows inbound Custom TCP Traffic on port ranges 14001-14100. This blog might also help.
If neither of these work, you'd need to share with us your IAM policy attached to your lambda, your inbound and outbound security group rules for all security groups attached to both the lambda and the cluster for further scrutiny.