pythonazureazure-aksazure-eventhub

How to connect my pod on AKS to Azure Event Hub with its kafka interface?


How can I connect a Kafka consumer written in Python with Event Hub on an AKS pod? I've already tried using Workload Identity with a Service Connector (previously I've tried with a connection string without success), but I'm still unable to connect. I did make sure that the created identity has the necessary rights on Event Hub.

The consumer was tested locally and works fine, here is the code

from azure.identity import DefaultAzureCredential, WorkloadIdentityCredential, ManagedIdentityCredential
from azure.storage.blob import BlobServiceClient
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import json
import logging
from functools import partial
from pprint import pformat
import os


def stats_cb(stats_json_str):
    stats_json = json.loads(stats_json_str)
    print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))


def oauth_cb(cred, namespace_fqdn, config):
    # confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)

    # cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
    # namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
    # config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param
    access_token = cred.get_token('https://<eventhubs-namespace>.servicebus.windows.net/.default')
    return access_token.token, access_token.expires_on


def print_usage_and_exit(program_name):
    sys.stderr.write(
        'Usage: %s [options..] <eventhubs-namespace> <group> <topic1> <topic2> ..\n' % program_name)
    options = '''
 Options:
  -T <intvl>   Enable client statistics at specified interval (ms)
'''
    sys.stderr.write(options)
    sys.exit(1)


if __name__ == '__main__':
    optlist, argv = getopt.getopt("<resource group", 'T:')
    if len(argv) < 3:
        print_usage_and_exit(sys.argv[0]) 

      # Azure credential     # See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview

    cred = DefaultAzureCredential(managed_identity_client_id="<id>")


    # Consumer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
        'bootstrap.servers': '<eventhubs-namespace>.servicebus.windows.net:9093',
        'group.id': '$Default',
        'session.timeout.ms': 6000,
        'auto.offset.reset': 'earliest',

        # Required OAuth2 configuration properties
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'OAUTHBEARER',
        # the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
        'oauth_cb': partial(oauth_cb, cred, '<eventhubs-namespace>.servicebus.windows.net:9093'),
    }
    #print(str(conf))
    # Check to see if -T option exists
    for opt in optlist:
        if opt[0] != '-T':
            continue
        try:
            intval = int(opt[1])
        except ValueError:
            sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
            sys.exit(1)

        if intval <= 0:
            sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
            sys.exit(1)

        conf['stats_cb'] = stats_cb
        conf['statistics.interval.ms'] = int(opt[1])

    # Create logger for consumer (logs will be emitted when poll() is called)
    logger = logging.getLogger('consumer')
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
    logger.addHandler(handler)

    # Create Consumer instance
    # Hint: try debug='fetch' to generate some log messages
    c = Consumer(conf, logger=logger)
    #print(str(c.list_topics().topics))
    def print_assignment(consumer, partitions):
        print('Assignment:', partitions)

    # Subscribe to topics
    c.subscribe(["test"], on_assign=print_assignment)

    # Read messages from Kafka, print to stdout
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')

    finally:
        # Close down consumer to commit final offsets.
        c.close()

Here is the deployment on K8s

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer
  namespace: default
  labels:
    app: consumer
    azure.workload.identity/use: "true"
spec:
  replicas: 1
  selector:
    matchLabels:
      app: consumer
  template:
    metadata:
      labels:
        app: consumer
    spec:
      serviceAccountName: sc-account-<name>
      containers:       
      - name: consumer         
        image: <image>
        command: ["/venv/bin/python","/consumer.py"]
        envFrom:
          - secretRef:
              name: sc-eventhub<secret>

Solution

  • Follow the below steps for authentication and configuration of Azure Event Hub using Kafka consumer on an AKS in Python .

    Firstly , set up Azure Identity to get the necessary permissions for access your Event Hub.Use DefaultAzureCredential for handling the authentication.

    Create a Dockerfile to containerize your Python application.

    FROM python:3.9-slim
    
    WORKDIR /app
    
    COPY requirements.txt requirements.txt
    RUN pip install -r requirements.txt
    
    COPY consumer.py consumer.py
    
    CMD ["python", "consumer.py", "<eventhubs-namespace>", "<group>", "<topic>"]
    

    Build and push the Docker image to a container registry. refer this for Configure Kafka Connect for Event Hubs

    docker build -t <your-registry>/kafka-consumer .
    docker push <your-registry>/kafka-consumer
    

    enter image description here

    Create a Kubernetes deployment YAML file to deploy your containerized application.

    
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: kafka-consumer
      namespace: default
      labels:
        app: kafka-consumer
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: kafka-consumer
      template:
        metadata:
          labels:
            app: kafka-consumer
        spec:
          serviceAccountName: <your-service-account>
          containers:
            - name: kafka-consumer
              image: <your-registry>/kafka-consumer
              env:
                - name: AZURE_CLIENT_ID
                  value: <your-client-id>
                - name: AZURE_TENANT_ID
                  value: <your-tenant-id>
                - name: AZURE_CLIENT_SECRET
                  value: <your-client-secret>
              resources:
                limits:
                  memory: "512Mi"
                  cpu: "500m"
                requests:
                  memory: "256Mi"
                  cpu: "250m"
    
    

    Apply the deployment to your AKS cluster.

    kubectl apply -f kafka-consumer-deployment.yaml
    

    enter image description here

    Check the logs to ensure the consumer is running correctly and connecting to the Event Hub.

    kubectl logs -f deployment/kafka-consumer
    

    enter image description here

    enter image description here