apache-sparkapache-kafkaavroconfluent-schema-registryspark-structured-streaming

Integrating Spark Structured Streaming with the Confluent Schema Registry


I'm using a Kafka Source in Spark Structured Streaming to receive Confluent encoded Avro records. I intend to use Confluent Schema Registry, but the integration with spark structured streaming seems to be impossible.

I have seen this question, but unable to get it working with the Confluent Schema Registry. Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)


Solution

  • Disclaimer

    This code was only tested on a local master, and has been reported runs into serializer issues in a clustered environment. There's an alternative solution (step 7-9, with Scala code in step 10), that extracts out the schema ids to columns, looks up each unique ID, and then uses schema broadcast variables, which will work better, at scale. Or see answer from @timothyzhang

    Also, there is an external library AbsaOSS/ABRiS that also addresses using the Registry with Spark


    Since the other answer that was mostly useful was removed, I wanted to re-add it with some refactoring and comments.

    Here are the dependencies needed. Code tested with Confluent 5.x and Spark 2.4

         <dependency>
                <groupId>io.confluent</groupId>
                <artifactId>kafka-avro-serializer</artifactId>
                <version>${confluent.version}</version>
                <exclusions> 
                    <!-- Conflicts with Spark's version -->
                    <exclusion> 
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka-clients</artifactId>
                    </exclusion>
                </exclusions>
         </dependency>
     
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
    

    And here is the Scala implementation (only tested locally on master=local[*])

    First section, define the imports, some fields, and a few helper methods to get schemas

    import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
    import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
    import org.apache.avro.Schema
    import org.apache.avro.generic.GenericRecord
    import org.apache.commons.cli.CommandLine
    import org.apache.spark.sql._
    import org.apache.spark.sql.avro.SchemaConverters
    import org.apache.spark.sql.streaming.OutputMode
    
    object App {
    
      private var schemaRegistryClient: SchemaRegistryClient = _
    
      private var kafkaAvroDeserializer: AvroDeserializer = _
    
      def lookupTopicSchema(topic: String, isKey: Boolean = false) = {
        schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
      }
    
      def avroSchemaToSparkSchema(avroSchema: String) = {
        SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
      }
    
     // ... continues below
    

    Then define a simple main method that parses the CMD args to get Kafka details

      def main(args: Array[String]): Unit = {
        val cmd: CommandLine = parseArg(args)
    
        val master = cmd.getOptionValue("master", "local[*]")
        val spark = SparkSession.builder()
          .appName(App.getClass.getName)
          .master(master)
          .getOrCreate()
    
        val bootstrapServers = cmd.getOptionValue("bootstrap-server")
        val topic = cmd.getOptionValue("topic")
        val schemaRegistryUrl = cmd.getOptionValue("schema-registry")
    
        consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)
    
        spark.stop()
      }
    
    
      // ... still continues
    

    Then, the important method that consumes the Kafka topic and deserializes it

      private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
        import spark.implicits._
    
        // Setup the Avro deserialization UDF
        schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
        kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient) 
        spark.udf.register("deserialize", (bytes: Array[Byte]) =>
          kafkaAvroDeserializer.deserialize(bytes)
        )
    
        // Load the raw Kafka topic (byte stream)
        val rawDf = spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", bootstrapServers)
          .option("subscribe", topic)
          .option("startingOffsets", "earliest")
          .load()
    
        // Deserialize byte stream into strings (Avro fields become JSON)
        import org.apache.spark.sql.functions._
        val jsonDf = rawDf.select(
          // 'key.cast(DataTypes.StringType),  // string keys are simplest to use
          callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
          callUDF("deserialize", 'value).as("value")
          // excluding topic, partition, offset, timestamp, etc
        )
    
        // Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
        val dfValueSchema = {
          val rawSchema = lookupTopicSchema(topic)
          avroSchemaToSparkSchema(rawSchema)
        }
    
        // Apply structured schema to JSON stream
        val parsedDf = jsonDf.select(
          'key, // keys are usually plain strings
          // values are JSONified Avro records
          from_json('value, dfValueSchema.dataType).alias("value")
        ).select(
          'key,
          $"value.*" // flatten out the value
        )
    
        // parsedDf.printSchema()
    
        // Sample schema output
        // root
        // |-- key: string (nullable = true)
        // |-- header: struct (nullable = true)   // Not a Kafka record "header". This is part of our value schema
        // |    |-- time: long (nullable = true)
        // |    ...
    
        // TODO: Do something interesting with this stream
        parsedDf.writeStream
          .format("console")
          .outputMode(OutputMode.Append())
          .option("truncate", false)
          .start()
          .awaitTermination()
      }
    
     // still continues
    

    The command line parser allows for passing in bootstrap servers, schema registry, topic name, and Spark master.

      private def parseArg(args: Array[String]): CommandLine = {
        import org.apache.commons.cli._
    
        val options = new Options
    
        val masterOption = new Option("m", "master", true, "Spark master")
        masterOption.setRequired(false)
        options.addOption(masterOption)
    
        val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
        bootstrapOption.setRequired(true)
        options.addOption(bootstrapOption)
    
        val topicOption = new Option("t", "topic", true, "Kafka topic")
        topicOption.setRequired(true)
        options.addOption(topicOption)
    
        val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
        schemaRegOption.setRequired(true)
        options.addOption(schemaRegOption)
    
        val parser = new BasicParser
        parser.parse(options, args)
      }
    
      // still continues
    

    In order for the UDF above to work, then there needed to be a deserializer to take the DataFrame of bytes to one containing deserialized Avro

      // Simple wrapper around Confluent deserializer
      class AvroDeserializer extends AbstractKafkaAvroDeserializer {
        def this(client: SchemaRegistryClient) {
          this()
          // TODO: configure the deserializer for authentication 
          this.schemaRegistry = client
        }
    
        override def deserialize(bytes: Array[Byte]): String = {
          val value = super.deserialize(bytes)
          value match {
            case str: String =>
              str
            case _ =>
              val genericRecord = value.asInstanceOf[GenericRecord]
              genericRecord.toString
          }
        }
      }
    
    } // end 'object App'
    

    Put each of these blocks together, and it works in IntelliJ after adding -b localhost:9092 -s http://localhost:8081 -t myTopic to Run Configurations > Program Arguments