amazon-web-servicesapache-kafka-connectavroapache-icebergaws-msk-connect

Build IcebergSinkConnector with AWSKafkaAvroConverter for MSK Connect


I'm trying to build a MSK Connect compatible IcebergSinkConnector with AWSKafkaAvroConverter as I'm using the AWS Streaming Schema Registry. The following is my connector configuration:

connector_configuration = {
    "connector.class"   = "org.apache.iceberg.connect.IcebergSinkConnector"
    "tasks.max"         = "4"
    "errors.log.enable" = "true"
    "topics" = join(",", keys(var.topics))
    "transforms"                        = "sanitize,topicToField"
    "transforms.sanitize.type"          = "org.apache.kafka.connect.transforms.RegexRouter"
    "transforms.sanitize.regex"         = "(\\.)"
    "transforms.sanitize.replacement"   = "_"  # foo.bar.baz → foo_bar_baz
    "transforms.topicToField.type"      = "org.apache.kafka.connect.transforms.InsertField$Value"
    "transforms.topicToField.topic.field" = "src_kafka_topic"
    "key.converter"                    = "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
    "key.converter.region"             = data.aws_region.current.name
    "key.converter.schema.registry.name" = aws_glue_registry.this.registry_name
    "key.converter.avroRecordType":    "GENERIC_RECORD",
    "key.converter.schemaAutoRegistrationEnabled":   false,
    "value.converter"                  = "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
    "value.converter.region"           = data.aws_region.current.name
    "value.converter.schema.registry.name" = aws_glue_registry.this.registry_name
    "value.converter.avroRecordType":  "GENERIC_RECORD",
    "value.converter.schemaAutoRegistrationEnabled": false,
    "iceberg.catalog.io-impl"          = "org.apache.iceberg.aws.s3.S3FileIO"
    "iceberg.catalog.type": "rest",
    "iceberg.catalog.uri": "https://s3tables.us-east-1.amazonaws.com/iceberg",
    "iceberg.catalog.warehouse": aws_s3tables_table_bucket.this.arn,
    "iceberg.catalog.default-namespace" = aws_s3tables_namespace.this.namespace
    "iceberg.catalog.client.region": data.aws_region.current.name,
    "iceberg.catalog.rest.signing-region": data.aws_region.current.name,
    "iceberg.catalog.rest.signing-name": "s3tables",
    "iceberg.catalog.rest.sigv4-enabled": "true",
    "iceberg.tables.auto-create-enabled"   = "true"
    "iceberg.tables.evolve-schema-enabled" = "true"
    "iceberg.tables.dynamic-enabled"    = "true"
    "iceberg.tables.route-field"        = "src_kafka_topic"
  }

I believe this configuration is correct as I was able to get a version of my connector to auto create a table in S3 Tables. However, I have been unable to get my custom plugin to actually write data to the table.

The closest I believe I've come to creating a working plugin has been with this script:

#!/bin/bash

set -e

APACHE_ICEBERG_VERSION=1.9.2
AWS_SCHEMA_REGISTRY_VERSION=1.1.24

curl -s "https://get.sdkman.io" | bash
source "$HOME/.sdkman/bin/sdkman-init.sh"
sdk install gradle && sdk install maven

mkdir /build && cd /build

git clone https://github.com/awslabs/aws-glue-schema-registry.git && cd aws-glue-schema-registry && git checkout v${AWS_SCHEMA_REGISTRY_VERSION}

mvn clean install
cd /build
mkdir outputs
cp "aws-glue-schema-registry/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-${AWS_SCHEMA_REGISTRY_VERSION}.jar" outputs/aws-glue-schema-registry.jar

git clone https://github.com/apache/iceberg.git && cd iceberg && git checkout apache-iceberg-${APACHE_ICEBERG_VERSION}
./gradlew -x test -x integrationTest clean build
cd ..
cp ./iceberg/kafka-connect/kafka-connect-runtime/build/distributions/iceberg-kafka-connect-runtime-[0-9]*.zip outputs/iceberg-sink.zip

cd /out
cp /build/outputs/iceberg-sink.zip ./ && unzip iceberg-sink.zip && rm iceberg-sink.zip
mkdir -p aws-glue-schema-registry/lib && cp /build/outputs/aws-glue-schema-registry.jar aws-glue-schema-registry/lib/

zip -r iceberg-sink.zip .

which I ran from this Dockerfile:

FROM amazoncorretto:17

SHELL ["/bin/bash", "-c"]

RUN yum install -y git zip unzip tar

COPY build.sh .

CMD ./build.sh

using these commands:

docker build -t iceberg-sink .
docker run -v ./out:/out iceberg-sink

I then picked up iceberg-sink.zip from /out, uploaded to S3 and created the MSK Connect plugin. When starting up the MSK connector with the above configuration, the connection to the brokers seems to work, but I get the error:

[Worker-084dd25bd58c041d4] java.lang.NoSuchFieldError: CLIENT_ENDPOINT_PROVIDER
"[Worker-084dd25bd58c041d4]     at software.amazon.awssdk.services.s3.DefaultS3BaseClientBuilder.finalizeServiceConfiguration(DefaultS3BaseClientBuilder.java:188)"
"[Worker-084dd25bd58c041d4]     at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.finalizeChildConfiguration(AwsDefaultClientBuilder.java:160)"
"[Worker-084dd25bd58c041d4]     at software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder.syncClientConfiguration(SdkDefaultClientBuilder.java:188)"
"[Worker-084dd25bd58c041d4]     at software.amazon.awssdk.services.s3.DefaultS3ClientBuilder.buildClient(DefaultS3ClientBuilder.java:37)"
"[Worker-084dd25bd58c041d4]     at software.amazon.awssdk.services.s3.DefaultS3ClientBuilder.buildClient(DefaultS3ClientBuilder.java:26)"
"[Worker-084dd25bd58c041d4]     at software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder.build(SdkDefaultClientBuilder.java:155)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.aws.AwsClientFactories$DefaultAwsClientFactory.s3(AwsClientFactories.java:119)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.aws.s3.S3FileIO.client(S3FileIO.java:391)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.aws.s3.S3FileIO.newOutputFile(S3FileIO.java:193)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.io.OutputFileFactory.newOutputFile(OutputFileFactory.java:107)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.openCurrent(BaseTaskWriter.java:329)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.<init>(BaseTaskWriter.java:296)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.<init>(BaseTaskWriter.java:378)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.io.UnpartitionedWriter.<init>(UnpartitionedWriter.java:37)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.connect.data.RecordUtils.createTableWriter(RecordUtils.java:159)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.connect.data.IcebergWriter.initNewWriter(IcebergWriter.java:54)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.connect.data.IcebergWriter.<init>(IcebergWriter.java:50)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.connect.data.IcebergWriterFactory.createWriter(IcebergWriterFactory.java:70)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.connect.data.SinkWriter.lambda$writerForTable$3(SinkWriter.java:139)"
"[Worker-084dd25bd58c041d4]     at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.connect.data.SinkWriter.writerForTable(SinkWriter.java:138)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.connect.data.SinkWriter.routeRecordDynamically(SinkWriter.java:124)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.connect.data.SinkWriter.save(SinkWriter.java:83)"
"[Worker-084dd25bd58c041d4]     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.connect.data.SinkWriter.save(SinkWriter.java:68)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.connect.channel.Worker.save(Worker.java:124)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.connect.channel.CommitterImpl.save(CommitterImpl.java:151)"
"[Worker-084dd25bd58c041d4]     at org.apache.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:83)"
"[Worker-084dd25bd58c041d4]     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)"
"[Worker-084dd25bd58c041d4]     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)"
"[Worker-084dd25bd58c041d4]     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)"
"[Worker-084dd25bd58c041d4]     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)"
"[Worker-084dd25bd58c041d4]     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)"
"[Worker-084dd25bd58c041d4]     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)"
"[Worker-084dd25bd58c041d4]     at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)"
"[Worker-084dd25bd58c041d4]     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)"
"[Worker-084dd25bd58c041d4]     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)"
"[Worker-084dd25bd58c041d4]     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)"
"[Worker-084dd25bd58c041d4]     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)"
"[Worker-084dd25bd58c041d4]     at java.base/java.lang.Thread.run(Thread.java:840)"

It seems like there may be a conflict with AWS libraries in the combined plugin, but I am unsure how to resolve it.


Solution

  • The issue was indeed AWS library conflicts between the two libraries. As of writing, apache/iceberg is on 2.32 while awslabs/aws-glue-schema-registry is on 2.22. The aws-glue-schema-registry library already has maven-shade-plugin setup for avro-kafkaconnect-converter, so I manually updated the pom.xml for that specific plugin to relocate AWS SDK classes so there are no conflicts at runtime. I also moved the full build within Docker. Here's the final Dockerfile:

    FROM amazoncorretto:17
    
    ARG APACHE_ICEBERG_VERSION=1.9.2
    ARG AWS_SCHEMA_REGISTRY_VERSION=1.1.24
    
    SHELL ["/bin/bash", "-c"]
    
    RUN yum install -y git zip unzip tar wget curl && \
        YQ_VERSION=v4.44.3 && \
        ARCH="$(uname -m)" && \
        case "$ARCH" in \
          x86_64)   YQ_BIN=yq_linux_amd64 ;; \
          aarch64|arm64) YQ_BIN=yq_linux_arm64 ;; \
          *) echo "unsupported arch: $ARCH" >&2; exit 1 ;; \
        esac && \
        wget -q "https://github.com/mikefarah/yq/releases/download/${YQ_VERSION}/${YQ_BIN}" -O /usr/local/bin/yq && \
        chmod +x /usr/local/bin/yq
    RUN curl -s "https://get.sdkman.io" | bash && \
        source "$HOME/.sdkman/bin/sdkman-init.sh" && \
        sdk install maven
    
    
    WORKDIR /build
    RUN git clone --branch v$AWS_SCHEMA_REGISTRY_VERSION https://github.com/awslabs/aws-glue-schema-registry.git
    RUN git clone --branch apache-iceberg-$APACHE_ICEBERG_VERSION https://github.com/apache/iceberg.git
    
    COPY fix-asr-pom.sh .
    RUN ./fix-asr-pom.sh
    
    RUN cd aws-glue-schema-registry && source "$HOME/.sdkman/bin/sdkman-init.sh" && mvn clean install
    RUN cd iceberg && ./gradlew -x test -x integrationTest clean build
    RUN mkdir outputs
    RUN cp "aws-glue-schema-registry/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-${AWS_SCHEMA_REGISTRY_VERSION}.jar"  \
        outputs/aws-glue-schema-registry.jar
    RUN cp ./iceberg/kafka-connect/kafka-connect-runtime/build/distributions/iceberg-kafka-connect-runtime-[0-9]*.zip outputs/iceberg-sink.zip
    
    WORKDIR /final
    RUN cp /build/outputs/iceberg-sink.zip ./ &&  \
        unzip iceberg-sink.zip && rm iceberg-sink.zip &&  \
        mv iceberg-kafka-connect-runtime-*-SNAPSHOT iceberg-kafka-connect-runtime
    RUN cd iceberg-kafka-connect-runtime/lib && cp /build/outputs/aws-glue-schema-registry.jar .
    
    RUN zip -r iceberg-sink.zip ./iceberg-kafka-connect-runtime
    
    ENTRYPOINT ["cp", "/final/iceberg-sink.zip"]
    
    # fix-asr-pom.sh
    #!/usr/bin/env bash
    set -euo pipefail
    POM="/build/aws-glue-schema-registry/avro-kafkaconnect-converter/pom.xml"
    
    # 2) Bump maven-shade-plugin version
    yq eval -i '.project.build.plugins.plugin[] |= (select(.artifactId == "maven-shade-plugin") | .version = "3.5.1")' "$POM"
    
    # 3) Add relocations configuration directly to the execution element
    yq eval -i '.project.build.plugins.plugin[] |=
    (select(.artifactId == "maven-shade-plugin") |
     .executions.execution.configuration = {
       "relocations": {
         "relocation": [
           {
             "pattern": "software.amazon.awssdk",
             "shadedPattern": "shaded.software.amazon.awssdk"
           },
           {
             "pattern": "software.amazon.eventstream",
             "shadedPattern": "shaded.software.amazon.eventstream"
           },
           {
             "pattern": "com.amazonaws",
             "shadedPattern": "shaded.com.amazonaws"
           }
         ]
       }
     })' "$POM"
    

    I can run:

    docker build -t iceberg-sink .
    docker run -v ./out:/out iceberg-sink /out/iceberg-sink.zip
    

    and then upload iceberg-sink.zip directly to S3 and then MSK connect.

    Interested if there's a way to get this to work without the custom pom.xml config to relocate the AWS SDK. Seems like plugin.path might work if not running in MSK Connect, but in MSK Connect that is an unsupported property.