pysparkazure-databricksconfluent-cloud

How to connect confluent cloud to databricks


I want to know how to connect confluent cloud to databricks. I wantto read data from confluent to spark dataframe.

I have used this code:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", confluentBootstrapserver) \
  .option("kafka.security.protocol", "SSL") \
  .option("subscribe", confluentTopic) \
  .option("startingOffsets", "earliest") \
  .option("kafka.sasl.jaas.config",
  "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required
   username\"**********\" password = \"******************************\";").load()

i have used API key in username and secret in password. and provided topic name in confluentTopic.

i am getting various types of error " java.util.concurrent.ExecutionException: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics". before that i was getting consumer can not be created. I am new to this so please elaborate on your answer.


Solution

  • You can use below code blocks.

    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    confluentBootstrapserver = "your_bootstrao_server"
    confluentTopic = "topic_0"
    apiKey="api_key"
    apiSecret="api_secret"
    

    Create new key in your cluster tab as below.

    enter image description here

    Copy and save the Api key and secret.

    Next, you need to give these key and secret in kafka.sasl.jaas.config spark option instead of giving username and password as below. I think you given the same and it worked in my environment.

    df = spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", confluentBootstrapserver) \
            .option("kafka.security.protocol", "SASL_SSL") \
            .option("kafka.sasl.mechanism", "PLAIN") \
            .option("kafka.sasl.jaas.config", f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{apiKey}" password="{apiSecret}";') \
            .option("subscribe", confluentTopic) \
            .option("startingOffsets", "earliest") \
            .load()
    

    Next, create a function to get results from binary type and do udf register.

    def conv_bin(x):
        import base64
        import json
        j = json.loads(base64.b64decode(str(x)).decode('utf-8'))
        return j
        
    get_string = udf(conv_bin, StringType())
    

    Display the results.

    display(df.withColumn("key",get_string(base64(col("key")))).withColumn("result",get_string(base64(col("value")))).select("key","result","topic"))
    

    Output:

    enter image description here

    and in spark

    enter image description here

    Check whether you giving correct port 9092, may be you batch data is too large or there may be problem with access to databricks from confluent.