How can I connect a Kafka consumer written in Python with Event Hub on an AKS pod? I've already tried using Workload Identity with a Service Connector (previously I've tried with a connection string without success), but I'm still unable to connect. I did make sure that the created identity has the necessary rights on Event Hub.
The consumer was tested locally and works fine, here is the code
from azure.identity import DefaultAzureCredential, WorkloadIdentityCredential, ManagedIdentityCredential
from azure.storage.blob import BlobServiceClient
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import json
import logging
from functools import partial
from pprint import pformat
import os
def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))
def oauth_cb(cred, namespace_fqdn, config):
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)
# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param
access_token = cred.get_token('https://<eventhubs-namespace>.servicebus.windows.net/.default')
return access_token.token, access_token.expires_on
def print_usage_and_exit(program_name):
sys.stderr.write(
'Usage: %s [options..] <eventhubs-namespace> <group> <topic1> <topic2> ..\n' % program_name)
options = '''
Options:
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)
if __name__ == '__main__':
optlist, argv = getopt.getopt("<resource group", 'T:')
if len(argv) < 3:
print_usage_and_exit(sys.argv[0])
# Azure credential # See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview
cred = DefaultAzureCredential(managed_identity_client_id="<id>")
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': '<eventhubs-namespace>.servicebus.windows.net:9093',
'group.id': '$Default',
'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest',
# Required OAuth2 configuration properties
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
'oauth_cb': partial(oauth_cb, cred, '<eventhubs-namespace>.servicebus.windows.net:9093'),
}
#print(str(conf))
# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except ValueError:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)
if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)
conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])
# Create logger for consumer (logs will be emitted when poll() is called)
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
logger.addHandler(handler)
# Create Consumer instance
# Hint: try debug='fetch' to generate some log messages
c = Consumer(conf, logger=logger)
#print(str(c.list_topics().topics))
def print_assignment(consumer, partitions):
print('Assignment:', partitions)
# Subscribe to topics
c.subscribe(["test"], on_assign=print_assignment)
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
# Close down consumer to commit final offsets.
c.close()
Here is the deployment on K8s
apiVersion: apps/v1
kind: Deployment
metadata:
name: consumer
namespace: default
labels:
app: consumer
azure.workload.identity/use: "true"
spec:
replicas: 1
selector:
matchLabels:
app: consumer
template:
metadata:
labels:
app: consumer
spec:
serviceAccountName: sc-account-<name>
containers:
- name: consumer
image: <image>
command: ["/venv/bin/python","/consumer.py"]
envFrom:
- secretRef:
name: sc-eventhub<secret>
Follow the below steps for authentication and configuration of Azure Event Hub using Kafka consumer on an AKS in Python .
Firstly , set up Azure Identity to get the necessary permissions for access your Event Hub.Use DefaultAzureCredential
for handling the authentication.
Create a Dockerfile to containerize your Python application.
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY consumer.py consumer.py
CMD ["python", "consumer.py", "<eventhubs-namespace>", "<group>", "<topic>"]
Build and push the Docker image to a container registry. refer this for Configure Kafka Connect for Event Hubs
docker build -t <your-registry>/kafka-consumer .
docker push <your-registry>/kafka-consumer
Create a Kubernetes deployment YAML file to deploy your containerized application.
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-consumer
namespace: default
labels:
app: kafka-consumer
spec:
replicas: 1
selector:
matchLabels:
app: kafka-consumer
template:
metadata:
labels:
app: kafka-consumer
spec:
serviceAccountName: <your-service-account>
containers:
- name: kafka-consumer
image: <your-registry>/kafka-consumer
env:
- name: AZURE_CLIENT_ID
value: <your-client-id>
- name: AZURE_TENANT_ID
value: <your-tenant-id>
- name: AZURE_CLIENT_SECRET
value: <your-client-secret>
resources:
limits:
memory: "512Mi"
cpu: "500m"
requests:
memory: "256Mi"
cpu: "250m"
Apply the deployment to your AKS cluster.
kubectl apply -f kafka-consumer-deployment.yaml
Check the logs to ensure the consumer is running correctly and connecting to the Event Hub.
kubectl logs -f deployment/kafka-consumer