pythonapache-kafkagoogle-cloud-dataflowapache-beamconfluent-cloud

How to connect kafka IO from apache beam to a cluster in confluent cloud


I´ve made a simple pipeline in Python to read from kafka, the thing is that the kafka cluster is on confluent cloud and I am having some trouble conecting to it.

Im getting the following log on the dataflow job:

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
    at org.apache.beam.sdk.io.kafka.KafkaIO$Read$GenerateKafkaSourceDescriptor.processElement(KafkaIO.java:1495)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

So I think Im missing something while passing the config since it mentions something related to it, Im really new to all of this and I know nothing about java so I dont know how to proceed even reading the JAAS documentation.

The code of the pipeline is the following:

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging

os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'

with open('cluster.configuration.json') as cluster:
    data=json.load(cluster)
    cluster.close()

def logger(element):
    logging.INFO('Something was found')  
      
def main():
    config={
        "bootstrap.servers":data["bootstrap.servers"],
        "security.protocol":data["security.protocol"],
        "sasl.mechanisms":data["sasl.mechanisms"],
        "sasl.username":data["sasl.username"],
        "sasl.password":data["sasl.password"],
        "session.timeout.ms":data["session.timeout.ms"],
        "auto.offset.reset":"earliest"
    }
    print('======================================================')
    beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
    with beam.Pipeline(options=beam_options) as p:
        msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'],expansion_service="localhost:8088")
        msgs | beam.FlatMap(logger)
        
if __name__ == '__main__':    
    main()

I read something about passing a property java.security.auth.login.config in the config dictionary but since that example is with java and I´am using python Im really lost at what I have to pass or even if that´s the property I have to pass etc.

btw Im getting the api key and secret from here and this is what I am passing to sasl.username and sasl.password

enter image description here


Solution

  • I faced the same error the first time I tried the beam's expansion service. The key sasl.mechanisms that you are supplying is incorrect, try with sasl.mechanism also you do not need to supply the username and password since you are connection is authenticated by jasl basically the consumer_config like below worked for me:

    config={
            "bootstrap.servers":data["bootstrap.servers"],
            "security.protocol":data["security.protocol"],
            "sasl.mechanism":data["sasl.mechanisms"],
            "session.timeout.ms":data["session.timeout.ms"],
            "group.id":"tto",
    "sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
            "auto.offset.reset":"earliest"
        }