apache-sparkapache-kafkaapache-kafka-connectavroaiven

How to use Spark -> Kafka -> JDBC sink connector with Avro correctly?


I have a simple Spark app generating Kafka messages by

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.avro.functions.to_avro
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}

object IngestFromS3ToKafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("ingest-from-s3-to-kafka")
      .config("spark.ui.port", "4040")
      .getOrCreate()

    val folderPath = "s3a://hongbomiao-bucket/iot/"

    val parquet_schema = new StructType()
      .add("timestamp", DoubleType)
      .add("current", DoubleType, nullable = true)
      .add("voltage", DoubleType, nullable = true)
      .add("temperature", DoubleType, nullable = true)

    val df = spark.readStream
      .schema(parquet_schema)
      .option("maxFilesPerTrigger", 1)
      .parquet(folderPath)
      .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
      .select(to_avro(struct("*")).alias("value"))

    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "hm.motor")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    query.awaitTermination()
  }
}

I have a Avro schema at Apicurio Registry created by

curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2/groups/hm-group/artifacts' \
--header 'Content-type: application/json; artifactType=AVRO' \
--header 'X-Registry-ArtifactId: hm-iot' \
--data '{
    "type": "record",
    "namespace": "com.hongbomiao",
    "name": "hm.motor",
    "fields": [
        {
            "name": "timestamp",
            "type": "long"
        },
        {
            "name": "current",
            "type": "double"
        },
        {
            "name": "voltage",
            "type": "double"
        },
        {
            "name": "temperature",
            "type": "double"
        }
    ]
}'

enter image description here

enter image description here

I am trying to use Apicurio Registry's Confluent compatible REST API endpoint. Currently using Content ID 26 to retrieve

curl --location 'http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6/schemas/ids/26' \
  --header 'Content-type: application/json; artifactType=AVRO' \
  --header 'X-Registry-ArtifactId: hm-iot'

which prints

{
    "schema": "{\n    \"type\": \"record\",\n    \"namespace\": \"com.hongbomiao\",\n    \"name\": \"hm.motor\",\n    \"fields\": [\n        {\n            \"name\": \"timestamp\",\n            \"type\": \"long\"\n        },\n        {\n            \"name\": \"current\",\n            \"type\": \"double\"\n        },\n        {\n            \"name\": \"voltage\",\n            \"type\": \"double\"\n        },\n        {\n            \"name\": \"temperature\",\n            \"type\": \"double\"\n        }\n    ]\n}",
    "references": []
}

which looks good.

Based on Aiven's JDBC connector doc, I wrote my JDBC sink connector config:

{
    "name": "hm-motor-jdbc-sink-kafka-connector",
    "config": {
        "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "hm.motor",
        "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
        "connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
        "connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",

        "insert.mode": "upsert",

        "table.name.format": "motor",

        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/ccompat/v6",

        "transforms": "convertTimestamp",
        "transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertTimestamp.field": "timestamp",
        "transforms.convertTimestamp.target.type": "Timestamp"
    }
}

However, I got this error in my Kafka Connect log

2023-05-01 19:01:11,291 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] WorkerSinkTask{id=hm-motor-jdbc-sink-kafka-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-hm-motor-jdbc-sink-kafka-connector-0]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:335)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hm.motor to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:518)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -1330532454
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:253)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:372)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:203)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    ... 18 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: No content with id/hash 'contentId--1330532454' was found.; error code: 40403
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:853)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:826)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:311)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:433)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:361)
    ... 21 more

It tries to get Content ID -1330532454, but obviously I don't have this. Mine is at 26. How does the JDBC look for the corresponding AVRO schema?

I am not sure how does it map now. I thought it will look for a schema called hm.motor based on Kafka topic, but turns out not.

Thanks!


UPDATE 1

Thanks @Ftisiot!

I found the document about the Kafka serializers and deserializers.

The Kafka serializers and deserializers default to using <topicName>-key and <topicName>-value as the corresponding subject name while registering or retrieving the schema.

Also for value.converter.value.subject.name.strategy, it uses io.confluent.kafka.serializers.subject.TopicNameStrategy by default.

I have updated my Avro schema name to hm.motor-value, but still got same error.

enter image description here


Solution

  • Thanks everyone's help, I finally figured out! I will try to summarize what I learned.

    1. Generating Kafka message in Avro format

    There are two major types of Avro data actually:

    1.1 [Succeed] Generating Confluent Avro data in Spark

    Confluent Avro is not "vanilla" Avro which causes some inconvenience for Spark and other tools.

    As @OneCricketeer pointed out, there is a library ABRiS to help generate Confluent Avro format Kafka message (toConfluentAvro).

    First generate Avro schema by

    curl --location 'http://confluent-schema-registry.svc:8081/subjects/hm.motor-value/versions' \
    --header 'Content-Type: application/vnd.schemaregistry.v1+json' \
    --data '{
        "schema": "{\"type\": \"record\", \"name\": \"motor\", \"fields\":[{ \"name\": \"timestamp\", \"type\": \"long\"},{ \"name\": \"current\", \"type\": \"double\"},{ \"name\": \"voltage\", \"type\": \"double\"},{ \"name\": \"temperature\", \"type\": \"double\"}]}"
    }'
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.{col, struct}
    import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
    import za.co.absa.abris.avro.functions.to_avro
    import za.co.absa.abris.config.{AbrisConfig, ToAvroConfig}
    
    object IngestFromS3ToKafka {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("ingest-from-s3-to-kafka")
          .config("spark.ui.port", "4040")
          .getOrCreate()
    
        val folderPath = "s3a://hongbomiao-bucket/iot/"
    
        val parquetSchema = new StructType()
          .add("timestamp", DoubleType)
          .add("current", DoubleType, nullable = true)
          .add("voltage", DoubleType, nullable = true)
          .add("temperature", DoubleType, nullable = true)
    
        val toAvroConfig: ToAvroConfig =
          AbrisConfig.toConfluentAvro.downloadSchemaByLatestVersion
            .andTopicNameStrategy("hm.motor")
            .usingSchemaRegistry(
              "http://confluent-schema-registry.svc:8081"
            )
    
        val df = spark.readStream
          .schema(parquetSchema)
          .option("maxFilesPerTrigger", 1)
          .parquet(folderPath)
          .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
          .select(to_avro(struct("*"), toAvroConfig).as("value"))
    
        val query = df.writeStream
          .format("kafka")
          .option(
            "kafka.bootstrap.servers",
            "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
          )
          .option("topic", "hm.motor")
          .option("checkpointLocation", "/tmp/checkpoint")
          .start()
    
        query.awaitTermination()
      }
    }
    

    build.sbt

    name := "IngestFromS3ToKafka"
    version := "1.0"
    scalaVersion := "2.12.17"
    resolvers += "confluent" at "https://packages.confluent.io/maven/"
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
      "org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
      "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
      "org.apache.spark" %% "spark-avro" % "3.4.0" % "provided",
      "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
      "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
      "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.463" % "provided",
      "za.co.absa" %% "abris" % "6.3.0"
    )
    ThisBuild / assemblyMergeStrategy := {
      // https://stackoverflow.com/a/67937671/2000548
      case PathList("module-info.class") => MergeStrategy.discard
      case x if x.endsWith("/module-info.class") => MergeStrategy.discard
      // https://stackoverflow.com/a/76129963/2000548
      case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
      case x =>
        val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
        oldStrategy(x)
    }
    

    1.2 [Succeed] Generating "standard" / "vanilla" Apache Avro data in Spark

    First, I generated my Varo schema by

    curl --location 'http://apicurio-registry.svc:8080/apis/registry/v2/groups/default/artifacts' \
    --header 'Content-type: application/json; artifactType=AVRO' \
    --header 'X-Registry-ArtifactId: hm.motor-value' \
    --data '{
        "type": "record",
        "namespace": "com.hongbomiao",
        "name": "motor",
        "fields": [
            {
                "name": "timestamp",
                "type": "long"
            },
            {
                "name": "current",
                "type": "double"
            },
            {
                "name": "voltage",
                "type": "double"
            },
            {
                "name": "temperature",
                "type": "double"
            }
        ]
    }'
    

    enter image description here

    In Spark, it is very straightforward to use it with native org.apache.spark.sql.avro.functions.to_avro.

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.{col, struct}
    import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
    import org.apache.spark.sql.avro.functions.to_avro
    import sttp.client3.{HttpClientSyncBackend, UriContext, basicRequest}
    
    object IngestFromS3ToKafka {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
          .builder()
          .master("local[*]")
          .appName("ingest-from-s3-to-kafka")
          .config("spark.ui.port", "4040")
          .getOrCreate()
    
        val folderPath = "s3a://hongbomiao-bucket/iot/"
    
        // For below `parquet_schema`, you can
        //  1. hard code like current code
        //  2. read from one file `val parquet_schema = spark.read.parquet("s3a://hongbomiao-bucket/iot/motor.parquet").schema`
        //  3. Maybe possible also from Avro, I will try in future!
        val parquetSchema = new StructType()
          .add("timestamp", DoubleType)
          .add("current", DoubleType, nullable = true)
          .add("voltage", DoubleType, nullable = true)
          .add("temperature", DoubleType, nullable = true)
    
        val backend = HttpClientSyncBackend()
        val response = basicRequest
          .get(
            uri"http://apicurio-registry.svc:8080/apis/registry/v2/groups/hm-group/artifacts/hm.motor-value"
          )
          .send(backend)
        val kafkaRecordValueSchema = response.body.fold(identity, identity)
    
        val df = spark.readStream
          .schema(parquetSchema)
          .option("maxFilesPerTrigger", 1)
          .parquet(folderPath)
          .withColumn("timestamp", (col("timestamp") * 1000).cast(LongType))
          .select(to_avro(struct("*"), kafkaRecordValueSchema).alias("value"))
    
        val query = df.writeStream
          .format("kafka")
          .option(
            "kafka.bootstrap.servers",
            "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
          )
          .option("topic", "hm.motor")
          .option("checkpointLocation", "/tmp/checkpoint")
          .start()
    
        query.awaitTermination()
      }
    }
    

    built.sbt

    name := "IngestFromS3ToKafka"
    version := "1.0"
    scalaVersion := "2.12.17"
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % "3.3.2" % "provided",
      "org.apache.spark" %% "spark-sql" % "3.3.2" % "provided",
      "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2" % "provided",
      "org.apache.spark" %% "spark-avro" % "3.3.2" % "provided",
      "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
      "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
      "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.461" % "provided",
      "com.softwaremill.sttp.client3" %% "core" % "3.8.15"
    )
    

    I got many ideas from this article.

    2. Reading Avro format Kafka message in JDBC Kafka Connector and sink to database

    2.1 Kafka message in Confluent Avro

    [Succeed] 2.1.1 Using io.confluent.connect.avro.AvroConverter with Confluent Registry

    Here we use Confluent Registry REST API:

    {
        "name": "hm-motor-jdbc-sink-kafka-connector",
        "config": {
            "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
            "tasks.max": 1,
            "topics": "hm.motor",
            "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
            "connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
            "connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",
            "insert.mode": "upsert",
            "table.name.format": "motor",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url": "http://confluent-schema-registry.svc:8081",
            "transforms": "convertTimestamp",
            "transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
            "transforms.convertTimestamp.field": "timestamp",
            "transforms.convertTimestamp.target.type": "Timestamp"
        }
    }
    

    2.1.2 Using io.confluent.connect.avro.AvroConverter with Apicurio Schema Registry

    Here we use Apicurio Registry's Confluent compatible REST API:

    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://apicurio-registry.svc:8080/apis/ccompat/v6",
    

    (I didn't further test this direction)

    2.1.3 Using io.apicurio.registry.utils.converter.AvroConverter with Apicurio Schema Registry

    Here we use Apicurio Registry's Confluent compatible REST API:

    "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/ccompat/v6",
    "value.converter.apicurio.registry.as-confluent": true,
    

    (I didn't further test this direction)

    2.3 [Succeed] Kafka message in "vanilla" Apache Avro

    Here we use io.apicurio.registry.utils.converter.AvroConverter.

    My JDBC connector config:

    {
        "name": "hm-motor-jdbc-sink-kafka-connector",
        "config": {
            "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
            "tasks.max": 1,
            "topics": "hm.motor",
            "connection.url": "jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db",
            "connection.user": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}",
            "connection.password": "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}",
            "insert.mode": "upsert",
            "table.name.format": "motor",
            "transforms": "convertTimestamp",
            "transforms.convertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
            "transforms.convertTimestamp.field": "timestamp",
            "transforms.convertTimestamp.target.type": "Timestamp",
    
            "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
            "value.converter.apicurio.registry.url": "http://apicurio-registry.svc:8080/apis/registry/v2"
            "value.converter.apicurio.registry.fallback.group-id": "hm-group",
            "value.converter.apicurio.registry.fallback.artifact-id": "hm.motor-value"
        }
    }
    

    Maybe in future I can figure out the way to get rid of value.converter.apicurio.registry.fallback related fields.

    More info about io.apicurio.registry.utils.converter.AvroConverter can be found at here.