javaapache-kafkaapache-kafka-connectdebezium

Custom transform for Kafka Connect


I get the below error:

[2025-02-10 14:54:53,530] INFO AbstractConfig values:
 (org.apache.kafka.common.config.AbstractConfig:370)
[2025-02-10 14:54:53,639] ERROR Failed to create connector for .\kafka-connect\oracle-sink.properties (org.apache.kafka.
connect.cli.ConnectStandalone:74)
[2025-02-10 14:54:53,642] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:84)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector con
figuration is invalid and contains the following 2 error(s):
Invalid value transforms.RemoveWeirdCharacters for configuration transforms.removeWeirdChars.type: Class transforms.Remo
veWeirdCharacters could not be found.
Invalid value null for configuration transforms.removeWeirdChars.type: Not a Transformation
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
        at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:123)
        at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:107)
        at org.apache.kafka.connect.cli.ConnectStandalone.processExtraArgs(ConnectStandalone.java:81)
        at org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:150)
        at org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:94)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:112)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and cont
ains the following 2 error(s):
Invalid value transforms.RemoveWeirdCharacters for configuration transforms.removeWeirdChars.type: Class transforms.Remo
veWeirdCharacters could not be found.
Invalid value null for configuration transforms.removeWeirdChars.type: Not a Transformation
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
        at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:754)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:204)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.lambda$null$0(StandaloneHerder.java:190)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecuto
r.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

For my java code:

package transforms;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;

import java.util.Map;

public class RemoveWeirdCharacters<R extends ConnectRecord<R>> implements Transformation<R> {

    @Override
    public R apply(R record) {
        if (record.value() instanceof String value) {
            value = value.replaceAll("^[^a-zA-Z0-9]+", "");
            return record.newRecord(
                    record.topic(),
                    record.kafkaPartition(),
                    record.keySchema(),
                    record.key(),
                    record.valueSchema(),
                    value,
                    record.timestamp()
            );
        }
        return record;
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef();
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public void close() {
    }
}

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>custom-transforms</artifactId>
    <name>custom-transforms</name>
    <version>1.0.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <junit.version>5.9.2</junit.version>
    </properties>

    <dependencies>
        <!-- Kafka Connect dependencies -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>3.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-transforms</artifactId>
            <version>3.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.7.1</version>
        </dependency>

        <!-- JUnit for testing -->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Compiler plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>21</source>
                    <target>21</target>
                </configuration>
            </plugin>
            <!-- Surefire plugin for running tests -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.0.0-M7</version>
            </plugin>
        </plugins>
    </build>
</project>

Kafka connect config:

name=oracle-kafka-connector
connector.class=io.debezium.connector.oracle.OracleConnector
tasks.max=1
topic.prefix=prefix
database.hostname=host
database.port=port
database.user=user
database.password=password
database.dbname=dbname
schema.include.list=schema
table.include.list=schema.table
schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
schema.history.internal.file.filename=path
transforms=reroute,unwrap,removeWeirdChars
transforms.reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.reroute.topic.regex=(.*).schema.table(.*)
transforms.reroute.topic.replacement=$1
snapshot.mode=configuration_based
snapshot.include.collection.list =dbname.schema.table
schema.history.internal.store.only.captured.tables.ddl=true
snapshot.mode.configuration.based.snapshot.schema=true
snapshot.mode.configuration.based.start.stream=true
tombstones.on.delete=false

transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=table,lsn

transforms.removeWeirdChars.type=transforms.RemoveWeirdCharacters

I moved my jar file to the location with plugins for Kafka Connect.

What I'm generally trying to do is add a transformation that will remove whitespace at the beginning of message because then messages are presented incorrectly in elastic.

Do you have an idea what's wrong?


Solution

  • I had the wrong version of java. It helped when I pasted my custom jar into the libs directory. This helped me debug because kafka-connect started returning errors.