dockerapache-kafkaapache-kafka-connectstrimziapicurio-registry

How to use io.apicurio.registry.utils.converter.AvroConverter in Kafka Connect created by Strimzi?


I have a Kafka cluster deployed by Strimzi and Apicurio Registry for Kafka schema registry.

I am hoping to use AvroConverter in the JDBC sink connector to sink data from Kafka to TimescaleDB.

Here is my Kafka Connect Dockerfile:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ \
  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip \
  && unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ \
  && rm -f jdbc-connector-for-apache-kafka.zip
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

My Kafka Connect:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: hm-kafka-iot-kafka-connect
  namespace: hm-kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: ghcr.io/hongbo-miao/hm-kafka-iot-kafka-connect:latest
  replicas: 1
  bootstrapServers: hm-kafka-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: hm-kafka-cluster-ca-cert
        certificate: ca.crt
  config:
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
  externalConfiguration:
    volumes:
      - name: hm-iot-db-credentials-volume
        secret:
          secretName: hm-iot-db-credentials

My JDBC sink connector:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: hm-motor-jdbc-sink-kafka-connector
  namespace: hm-kafka
  labels:
    strimzi.io/cluster: hm-kafka-iot-kafka-connect
spec:
  class: io.aiven.connect.jdbc.JdbcSinkConnector
  tasksMax: 32
  config:
    connector.class: io.aiven.connect.jdbc.JdbcSinkConnector
    tasks.max: 32
    topics: hm.motor
    connection.url: jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db
    connection.user: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}"
    connection.password: "${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}"
    insert.mode: multi
    batch.size: 100000

    # table
    table.name.format: motor

    # timestamp
    transforms: convertTimestamp
    transforms.convertTimestamp.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
    transforms.convertTimestamp.field: timestamp
    transforms.convertTimestamp.target.type: Timestamp

    # value
    value.converter: io.apicurio.registry.utils.converter.AvroConverter
    value.converter.apicurio.registry.url: http://apicurio-registry-apicurio-registry.hm-apicurio-registry.svc:8080/apis/registry/v2
    value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
    value.converter.apicurio.registry.as-confluent: true

(Note the config related with apicurio.registry most likely have issues too.)

However, I met this error (let's call it Error 1):

Error 1

2023-05-01 07:23:23,849 ERROR [hm-motor-jdbc-sink-kafka-connector|worker] [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'hm-motor-jdbc-sink-kafka-connector' (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [StartAndStopExecutor-connect-1-1]
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: hm-motor-jdbc-sink-kafka-connector
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$34(DistributedHerder.java:1800)
  at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:320)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1821)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getConnectorStartingCallable$36(DistributedHerder.java:1827)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value io.apicurio.registry.utils.converter.AvroConverter for configuration value.converter: Class io.apicurio.registry.utils.converter.AvroConverter could not be found.
  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
  at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
  at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133)
  at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:232)
  at org.apache.kafka.connect.runtime.SinkConnectorConfig.<init>(SinkConnectorConfig.java:85)
  at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:299)
  ... 6 more

Attempt 1 (to fix Error 1, Succeed)

Based on the error, I added apicurio-registry-utils-converter in my Kafka Connect Dockerfle:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ \
  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip \
  && unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ \
  && rm -f jdbc-connector-for-apache-kafka.zip \

  # apicurio-registry-utils-converter
  # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter
  && wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-utils-converter/2.4.2.Final/apicurio-registry-utils-converter-2.4.2.Final.jar \
  && mkdir -p /opt/kafka/plugins/apicurio-registry-utils-converter/ \
  && mv apicurio-registry-utils-converter-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-utils-converter/
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

Now it can succeed locate io.apicurio.registry.utils.converter.AvroConverter, however I have a new error. (Let's call it error 2)

Error 2

2023-05-01 06:58:11,129 INFO [hm-motor-jdbc-sink-kafka-connector|task-0] TaskConfig values: 
  task.class = class io.aiven.connect.jdbc.sink.JdbcSinkTask
 (org.apache.kafka.connect.runtime.TaskConfig) [StartAndStopExecutor-connect-1-5]
2023-05-01 06:58:11,129 INFO [hm-motor-jdbc-sink-kafka-connector|task-0] Instantiated task hm-motor-jdbc-sink-kafka-connector-0 with version null of type io.aiven.connect.jdbc.sink.JdbcSinkTask (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-5]
2023-05-01 06:58:11,129 ERROR [hm-motor-jdbc-sink-kafka-connector|task-0] Failed to start task hm-motor-jdbc-sink-kafka-connector-0 (org.apache.kafka.connect.runtime.Worker) [StartAndStopExecutor-connect-1-5]
java.lang.NoClassDefFoundError: io/apicurio/registry/serde/avro/AvroKafkaSerializer
  at io.apicurio.registry.utils.converter.AvroConverter.configure(AvroConverter.java:69)
  at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:324)
  at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:618)
  at org.apache.kafka.connect.runtime.Worker.startSinkTask(Worker.java:521)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1723)
  at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$31(DistributedHerder.java:1773)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: io.apicurio.registry.serde.avro.AvroKafkaSerializer
  ... 10 more

Attempt 2 (to fix Error 2, failed)

Based on the error, I added apicurio-registry-serdes-avro-serde in my Kafka Connect Dockerfile:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
RUN mkdir -p /opt/kafka/plugins/ \
  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.zip https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.zip \
  && unzip jdbc-connector-for-apache-kafka.zip -d /opt/kafka/plugins/ \
  && rm -f jdbc-connector-for-apache-kafka.zip \

  # apicurio-registry-utils-converter
  # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter
  && wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-utils-converter/2.4.2.Final/apicurio-registry-utils-converter-2.4.2.Final.jar \
  && mkdir -p /opt/kafka/plugins/apicurio-registry-utils-converter/ \
  && mv apicurio-registry-utils-converter-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-utils-converter/ \

  # apicurio-registry-serdes-avro-serde
  # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-serdes-avro-serde
  && wget --no-verbose https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-serdes-avro-serde/2.4.2.Final/apicurio-registry-serdes-avro-serde-2.4.2.Final.jar \
  && mkdir -p /opt/kafka/plugins/apicurio-registry-serdes-avro-serde/ \
  && mv apicurio-registry-serdes-avro-serde-2.4.2.Final.jar /opt/kafka/plugins/apicurio-registry-serdes-avro-serde/
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

But this time, the Error 2 is still there.

apicurio-registry-serdes-avro-serde seems not correct dependency to fix Error 2. What would be the correct dependency? Thanks!

Attempt 3 (Different direction)

I have followed @OneCricketeer's suggestion switching to kafka-connect-avro-converter and use with Apicurio Registry's Confluent compatible REST API endpoint /apis/ccompat/v6/ now.

Here is my Kafka Connect to use io.confluent.connect.avro.AvroConverter:

FROM docker.io/alpine:3.17.3 AS builder
USER root:root
  # kafka-connect-avro-converter
  # https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
  && wget --no-verbose --output-document=kafka-connect-avro-converter.zip https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.3.3/confluentinc-kafka-connect-avro-converter-7.3.3.zip \
  && mkdir -p /opt/kafka/plugins/kafka-connect-avro-converter/ \
  && unzip kafka-connect-avro-converter.zip -d /opt/kafka/plugins/kafka-connect-avro-converter/ \
  && rm -f kafka-connect-avro-converter.zip \

  # jdbc-connector-for-apache-kafka
  # https://github.com/aiven/jdbc-connector-for-apache-kafka
  && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar \
  && mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
  && tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
  && rm -f jdbc-connector-for-apache-kafka.tar
USER 1001

FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
USER 1001

Regarding the corresponding JDBC Sink Connector config, I have a different question at org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id -xxx

UPDATE: I found Confluent Avro format is different with vanilla Apache Avro which causes some inconvenience for Spark and other tools. So they are two different directions. Besides Confluent direction, I will continue looking for solution in this direction too.


Solution

  • The issue is before I added the dependency apicurio-registry-utils-converter.

    However, the correct one is apicurio-registry-distro-connect-converter.

    So here is my final Kafka Connect Dockerfile to use io.apicurio.registry.utils.converter.AvroConverter:

    FROM docker.io/alpine:3.17.3 AS builder
    USER root:root
    RUN mkdir -p /opt/kafka/plugins/ \
      # apicurio-registry-distro-connect-converter
      # https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-distro-connect-converter
      && wget --no-verbose --output-document=apicurio-registry-distro-connect-converter.tar.gz https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/2.4.2.Final/apicurio-registry-distro-connect-converter-2.4.2.Final.tar.gz \
      && mkdir -p /opt/kafka/plugins/apicurio-registry-distro-connect-converter/ \
      && tar -x -f apicurio-registry-distro-connect-converter.tar.gz -C /opt/kafka/plugins/apicurio-registry-distro-connect-converter/ \
      && rm -f apicurio-registry-distro-connect-converter.tar.gz \
    
      # jdbc-connector-for-apache-kafka
      # https://github.com/aiven/jdbc-connector-for-apache-kafka
      && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar \
      && mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
      && tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
      && rm -f jdbc-connector-for-apache-kafka.tar
    USER 1001
    
    FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
    USER root:root
    COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
    USER 1001
    

    For comparison purpose, here is the way to use io.confluent.connect.avro.AvroConverter

    FROM docker.io/alpine:3.17.3 AS builder
    USER root:root
      # kafka-connect-avro-converter
      # https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
      && wget --no-verbose --output-document=kafka-connect-avro-converter.zip https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.3.3/confluentinc-kafka-connect-avro-converter-7.3.3.zip \
      && mkdir -p /opt/kafka/plugins/kafka-connect-avro-converter/ \
      && unzip kafka-connect-avro-converter.zip -d /opt/kafka/plugins/kafka-connect-avro-converter/ \
      && rm -f kafka-connect-avro-converter.zip \
    
      # jdbc-connector-for-apache-kafka
      # https://github.com/aiven/jdbc-connector-for-apache-kafka
      && wget --no-verbose --output-document=jdbc-connector-for-apache-kafka.tar https://github.com/aiven/jdbc-connector-for-apache-kafka/releases/download/v6.8.0/jdbc-connector-for-apache-kafka-6.8.0.tar \
      && mkdir -p /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
      && tar -x -f jdbc-connector-for-apache-kafka.tar -C /opt/kafka/plugins/jdbc-connector-for-apache-kafka/ \
      && rm -f jdbc-connector-for-apache-kafka.tar
    USER 1001
    
    FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
    USER root:root
    COPY --from=builder /opt/kafka/plugins/ /opt/kafka/plugins/
    USER 1001