I'm trying to build a MSK Connect compatible IcebergSinkConnector with AWSKafkaAvroConverter as I'm using the AWS Streaming Schema Registry. The following is my connector configuration:
connector_configuration = {
"connector.class" = "org.apache.iceberg.connect.IcebergSinkConnector"
"tasks.max" = "4"
"errors.log.enable" = "true"
"topics" = join(",", keys(var.topics))
"transforms" = "sanitize,topicToField"
"transforms.sanitize.type" = "org.apache.kafka.connect.transforms.RegexRouter"
"transforms.sanitize.regex" = "(\\.)"
"transforms.sanitize.replacement" = "_" # foo.bar.baz → foo_bar_baz
"transforms.topicToField.type" = "org.apache.kafka.connect.transforms.InsertField$Value"
"transforms.topicToField.topic.field" = "src_kafka_topic"
"key.converter" = "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
"key.converter.region" = data.aws_region.current.name
"key.converter.schema.registry.name" = aws_glue_registry.this.registry_name
"key.converter.avroRecordType": "GENERIC_RECORD",
"key.converter.schemaAutoRegistrationEnabled": false,
"value.converter" = "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
"value.converter.region" = data.aws_region.current.name
"value.converter.schema.registry.name" = aws_glue_registry.this.registry_name
"value.converter.avroRecordType": "GENERIC_RECORD",
"value.converter.schemaAutoRegistrationEnabled": false,
"iceberg.catalog.io-impl" = "org.apache.iceberg.aws.s3.S3FileIO"
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://s3tables.us-east-1.amazonaws.com/iceberg",
"iceberg.catalog.warehouse": aws_s3tables_table_bucket.this.arn,
"iceberg.catalog.default-namespace" = aws_s3tables_namespace.this.namespace
"iceberg.catalog.client.region": data.aws_region.current.name,
"iceberg.catalog.rest.signing-region": data.aws_region.current.name,
"iceberg.catalog.rest.signing-name": "s3tables",
"iceberg.catalog.rest.sigv4-enabled": "true",
"iceberg.tables.auto-create-enabled" = "true"
"iceberg.tables.evolve-schema-enabled" = "true"
"iceberg.tables.dynamic-enabled" = "true"
"iceberg.tables.route-field" = "src_kafka_topic"
}
I believe this configuration is correct as I was able to get a version of my connector to auto create a table in S3 Tables. However, I have been unable to get my custom plugin to actually write data to the table.
The closest I believe I've come to creating a working plugin has been with this script:
#!/bin/bash
set -e
APACHE_ICEBERG_VERSION=1.9.2
AWS_SCHEMA_REGISTRY_VERSION=1.1.24
curl -s "https://get.sdkman.io" | bash
source "$HOME/.sdkman/bin/sdkman-init.sh"
sdk install gradle && sdk install maven
mkdir /build && cd /build
git clone https://github.com/awslabs/aws-glue-schema-registry.git && cd aws-glue-schema-registry && git checkout v${AWS_SCHEMA_REGISTRY_VERSION}
mvn clean install
cd /build
mkdir outputs
cp "aws-glue-schema-registry/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-${AWS_SCHEMA_REGISTRY_VERSION}.jar" outputs/aws-glue-schema-registry.jar
git clone https://github.com/apache/iceberg.git && cd iceberg && git checkout apache-iceberg-${APACHE_ICEBERG_VERSION}
./gradlew -x test -x integrationTest clean build
cd ..
cp ./iceberg/kafka-connect/kafka-connect-runtime/build/distributions/iceberg-kafka-connect-runtime-[0-9]*.zip outputs/iceberg-sink.zip
cd /out
cp /build/outputs/iceberg-sink.zip ./ && unzip iceberg-sink.zip && rm iceberg-sink.zip
mkdir -p aws-glue-schema-registry/lib && cp /build/outputs/aws-glue-schema-registry.jar aws-glue-schema-registry/lib/
zip -r iceberg-sink.zip .
which I ran from this Dockerfile:
FROM amazoncorretto:17
SHELL ["/bin/bash", "-c"]
RUN yum install -y git zip unzip tar
COPY build.sh .
CMD ./build.sh
using these commands:
docker build -t iceberg-sink .
docker run -v ./out:/out iceberg-sink
I then picked up iceberg-sink.zip
from /out
, uploaded to S3 and created the MSK Connect plugin. When starting up the MSK connector with the above configuration, the connection to the brokers seems to work, but I get the error:
[Worker-084dd25bd58c041d4] java.lang.NoSuchFieldError: CLIENT_ENDPOINT_PROVIDER
"[Worker-084dd25bd58c041d4] at software.amazon.awssdk.services.s3.DefaultS3BaseClientBuilder.finalizeServiceConfiguration(DefaultS3BaseClientBuilder.java:188)"
"[Worker-084dd25bd58c041d4] at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.finalizeChildConfiguration(AwsDefaultClientBuilder.java:160)"
"[Worker-084dd25bd58c041d4] at software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder.syncClientConfiguration(SdkDefaultClientBuilder.java:188)"
"[Worker-084dd25bd58c041d4] at software.amazon.awssdk.services.s3.DefaultS3ClientBuilder.buildClient(DefaultS3ClientBuilder.java:37)"
"[Worker-084dd25bd58c041d4] at software.amazon.awssdk.services.s3.DefaultS3ClientBuilder.buildClient(DefaultS3ClientBuilder.java:26)"
"[Worker-084dd25bd58c041d4] at software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder.build(SdkDefaultClientBuilder.java:155)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.aws.AwsClientFactories$DefaultAwsClientFactory.s3(AwsClientFactories.java:119)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.aws.s3.S3FileIO.client(S3FileIO.java:391)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.aws.s3.S3FileIO.newOutputFile(S3FileIO.java:193)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.io.OutputFileFactory.newOutputFile(OutputFileFactory.java:107)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.openCurrent(BaseTaskWriter.java:329)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.<init>(BaseTaskWriter.java:296)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.<init>(BaseTaskWriter.java:378)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.io.UnpartitionedWriter.<init>(UnpartitionedWriter.java:37)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.connect.data.RecordUtils.createTableWriter(RecordUtils.java:159)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.connect.data.IcebergWriter.initNewWriter(IcebergWriter.java:54)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.connect.data.IcebergWriter.<init>(IcebergWriter.java:50)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.connect.data.IcebergWriterFactory.createWriter(IcebergWriterFactory.java:70)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.connect.data.SinkWriter.lambda$writerForTable$3(SinkWriter.java:139)"
"[Worker-084dd25bd58c041d4] at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.connect.data.SinkWriter.writerForTable(SinkWriter.java:138)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.connect.data.SinkWriter.routeRecordDynamically(SinkWriter.java:124)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.connect.data.SinkWriter.save(SinkWriter.java:83)"
"[Worker-084dd25bd58c041d4] at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.connect.data.SinkWriter.save(SinkWriter.java:68)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.connect.channel.Worker.save(Worker.java:124)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.connect.channel.CommitterImpl.save(CommitterImpl.java:151)"
"[Worker-084dd25bd58c041d4] at org.apache.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:83)"
"[Worker-084dd25bd58c041d4] at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)"
"[Worker-084dd25bd58c041d4] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)"
"[Worker-084dd25bd58c041d4] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)"
"[Worker-084dd25bd58c041d4] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)"
"[Worker-084dd25bd58c041d4] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)"
"[Worker-084dd25bd58c041d4] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)"
"[Worker-084dd25bd58c041d4] at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)"
"[Worker-084dd25bd58c041d4] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)"
"[Worker-084dd25bd58c041d4] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)"
"[Worker-084dd25bd58c041d4] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)"
"[Worker-084dd25bd58c041d4] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)"
"[Worker-084dd25bd58c041d4] at java.base/java.lang.Thread.run(Thread.java:840)"
It seems like there may be a conflict with AWS libraries in the combined plugin, but I am unsure how to resolve it.
The issue was indeed AWS library conflicts between the two libraries. As of writing, apache/iceberg is on 2.32 while awslabs/aws-glue-schema-registry is on 2.22. The aws-glue-schema-registry library already has maven-shade-plugin setup for avro-kafkaconnect-converter, so I manually updated the pom.xml for that specific plugin to relocate AWS SDK classes so there are no conflicts at runtime. I also moved the full build within Docker. Here's the final Dockerfile:
FROM amazoncorretto:17
ARG APACHE_ICEBERG_VERSION=1.9.2
ARG AWS_SCHEMA_REGISTRY_VERSION=1.1.24
SHELL ["/bin/bash", "-c"]
RUN yum install -y git zip unzip tar wget curl && \
YQ_VERSION=v4.44.3 && \
ARCH="$(uname -m)" && \
case "$ARCH" in \
x86_64) YQ_BIN=yq_linux_amd64 ;; \
aarch64|arm64) YQ_BIN=yq_linux_arm64 ;; \
*) echo "unsupported arch: $ARCH" >&2; exit 1 ;; \
esac && \
wget -q "https://github.com/mikefarah/yq/releases/download/${YQ_VERSION}/${YQ_BIN}" -O /usr/local/bin/yq && \
chmod +x /usr/local/bin/yq
RUN curl -s "https://get.sdkman.io" | bash && \
source "$HOME/.sdkman/bin/sdkman-init.sh" && \
sdk install maven
WORKDIR /build
RUN git clone --branch v$AWS_SCHEMA_REGISTRY_VERSION https://github.com/awslabs/aws-glue-schema-registry.git
RUN git clone --branch apache-iceberg-$APACHE_ICEBERG_VERSION https://github.com/apache/iceberg.git
COPY fix-asr-pom.sh .
RUN ./fix-asr-pom.sh
RUN cd aws-glue-schema-registry && source "$HOME/.sdkman/bin/sdkman-init.sh" && mvn clean install
RUN cd iceberg && ./gradlew -x test -x integrationTest clean build
RUN mkdir outputs
RUN cp "aws-glue-schema-registry/avro-kafkaconnect-converter/target/schema-registry-kafkaconnect-converter-${AWS_SCHEMA_REGISTRY_VERSION}.jar" \
outputs/aws-glue-schema-registry.jar
RUN cp ./iceberg/kafka-connect/kafka-connect-runtime/build/distributions/iceberg-kafka-connect-runtime-[0-9]*.zip outputs/iceberg-sink.zip
WORKDIR /final
RUN cp /build/outputs/iceberg-sink.zip ./ && \
unzip iceberg-sink.zip && rm iceberg-sink.zip && \
mv iceberg-kafka-connect-runtime-*-SNAPSHOT iceberg-kafka-connect-runtime
RUN cd iceberg-kafka-connect-runtime/lib && cp /build/outputs/aws-glue-schema-registry.jar .
RUN zip -r iceberg-sink.zip ./iceberg-kafka-connect-runtime
ENTRYPOINT ["cp", "/final/iceberg-sink.zip"]
# fix-asr-pom.sh
#!/usr/bin/env bash
set -euo pipefail
POM="/build/aws-glue-schema-registry/avro-kafkaconnect-converter/pom.xml"
# 2) Bump maven-shade-plugin version
yq eval -i '.project.build.plugins.plugin[] |= (select(.artifactId == "maven-shade-plugin") | .version = "3.5.1")' "$POM"
# 3) Add relocations configuration directly to the execution element
yq eval -i '.project.build.plugins.plugin[] |=
(select(.artifactId == "maven-shade-plugin") |
.executions.execution.configuration = {
"relocations": {
"relocation": [
{
"pattern": "software.amazon.awssdk",
"shadedPattern": "shaded.software.amazon.awssdk"
},
{
"pattern": "software.amazon.eventstream",
"shadedPattern": "shaded.software.amazon.eventstream"
},
{
"pattern": "com.amazonaws",
"shadedPattern": "shaded.com.amazonaws"
}
]
}
})' "$POM"
I can run:
docker build -t iceberg-sink .
docker run -v ./out:/out iceberg-sink /out/iceberg-sink.zip
and then upload iceberg-sink.zip
directly to S3 and then MSK connect.
Interested if there's a way to get this to work without the custom pom.xml config to relocate the AWS SDK. Seems like plugin.path
might work if not running in MSK Connect, but in MSK Connect that is an unsupported property.