apache-kafkaamazon-iamaws-mskconfluent-kafka-python

Confluent-Kafka: no broker available for coordinator query: intervaled in state query-coord


I am using python-confluent-kafka to create a producer.

The Kafka cluster is on MSK 3.7.x KRaft with IAM enabled and TLS enabled, both within the cluster and between clients and brokers.

Any ideas on how to fix this?

Both producers and consumers produce a similar output:

[thrd:main]: Topic test_topic metadata information unknown

[thrd:main]: Topic test_topic partition count is zero: should refresh metadata

[thrd:main]: Cluster connection already in progress: refresh unavailable topics

[thrd:main]: Hinted cache of 1/1 topic(s) being queried

[thrd:main]: Skipping metadata refresh of 1 topic(s): refresh unavailable topics: no usable brokers

[thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection

Producer.py

class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token('aws-region')
        return token


tp = MSKTokenProvider()

kafka_producer = Producer({
    'bootstrap.servers': 'broker-1',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'OAUTHBEARER',
    'sasl.oauthbearer.config': tp,
    'ssl.ca.location': certifi.where()
})

while True:
    message = f"hello: {time.process_time()}"
    kafka_producer.poll(0)
    kafka_producer.produce(topic, message.encode('utf-8'), callback=delivery_report_func)


Solution

  • This was occurring as a result of a bad IAM config