pythonsslapache-kafkaconfluent-kafka-pythonmlrun

KafkaSource connection to Confluent Kafka (with SSL & SchemaRegistry)


I tried to connect to Confluent Kafka with KafkaSource (from MLRun) and I used historically this easy code:

# code with usage 'kafka-python>=2.0.2'
from kafka import KafkaProducer, KafkaConsumer

consumer = KafkaConsumer(
    'ak47-data.v1',
    bootstrap_servers =[
        'cpkafka01.eu.prod:9092', 
        'cpkafka02.eu.prod:9092', 
        'cpkafka03.eu.prod:9092'
    ],
    client_id='test',
    auto_offset_reset='earliest',
    sasl_mechanism="SCRAM-SHA-256",
    sasl_plain_password="***********",
    sasl_plain_username="***********",
    security_protocol='SASL_SSL',
    ssl_cafile="/v3io/bigdata/rootca.crt",
    ssl_certfile=None,
    ssl_keyfile=None)

# print first topic
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key, message.value))
    break

How to rewrite this code with KafkaSource usage?


Solution

  • Let me share function code for KafkaSource (for MLRun>=1.1.0). You can specific certificate (see rootca.crt) and list of kafka topics also.

    from mlrun.datastore.sources import KafkaSource
    
    # certificate
    with open('/v3io/bigdata/rootca.crt') as x: 
        caCert = x.read()
    
    # definition of KafkaSource
    kafka_source = KafkaSource(
        brokers=['cpkafka01.eu.prod:9092', 
        'cpkafka02.eu.prod:9092', 
        'cpkafka03.eu.prod:9092'],
        topics=["ak47-data.v1"],
        initial_offset="earliest",
        group="test",
        attributes={"sasl" : {
                      "enable": True,
                      "password" : "******",
                      "user" : "*******",
                      "handshake" : True,
                      "mechanism" : "SCRAM-SHA-256"},
                    "tls" : {
                      "enable": True,
                      "insecureSkipVerify" : False
                    },            
                   "caCert" : caCert}
    )