I get the below error:
[2025-02-10 14:54:53,530] INFO AbstractConfig values:
(org.apache.kafka.common.config.AbstractConfig:370)
[2025-02-10 14:54:53,639] ERROR Failed to create connector for .\kafka-connect\oracle-sink.properties (org.apache.kafka.
connect.cli.ConnectStandalone:74)
[2025-02-10 14:54:53,642] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:84)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector con
figuration is invalid and contains the following 2 error(s):
Invalid value transforms.RemoveWeirdCharacters for configuration transforms.removeWeirdChars.type: Class transforms.Remo
veWeirdCharacters could not be found.
Invalid value null for configuration transforms.removeWeirdChars.type: Not a Transformation
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:123)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:107)
at org.apache.kafka.connect.cli.ConnectStandalone.processExtraArgs(ConnectStandalone.java:81)
at org.apache.kafka.connect.cli.AbstractConnectCli.startConnect(AbstractConnectCli.java:150)
at org.apache.kafka.connect.cli.AbstractConnectCli.run(AbstractConnectCli.java:94)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:112)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and cont
ains the following 2 error(s):
Invalid value transforms.RemoveWeirdCharacters for configuration transforms.removeWeirdChars.type: Class transforms.Remo
veWeirdCharacters could not be found.
Invalid value null for configuration transforms.removeWeirdChars.type: Not a Transformation
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:754)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:204)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.lambda$null$0(StandaloneHerder.java:190)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecuto
r.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
For my java code:
package transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import java.util.Map;
public class RemoveWeirdCharacters<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public R apply(R record) {
if (record.value() instanceof String value) {
value = value.replaceAll("^[^a-zA-Z0-9]+", "");
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
value,
record.timestamp()
);
}
return record;
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public void close() {
}
}
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>custom-transforms</artifactId>
<name>custom-transforms</name>
<version>1.0.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<junit.version>5.9.2</junit.version>
</properties>
<dependencies>
<!-- Kafka Connect dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>3.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-transforms</artifactId>
<version>3.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.1</version>
</dependency>
<!-- JUnit for testing -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Compiler plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>21</source>
<target>21</target>
</configuration>
</plugin>
<!-- Surefire plugin for running tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M7</version>
</plugin>
</plugins>
</build>
</project>
Kafka connect config:
name=oracle-kafka-connector
connector.class=io.debezium.connector.oracle.OracleConnector
tasks.max=1
topic.prefix=prefix
database.hostname=host
database.port=port
database.user=user
database.password=password
database.dbname=dbname
schema.include.list=schema
table.include.list=schema.table
schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
schema.history.internal.file.filename=path
transforms=reroute,unwrap,removeWeirdChars
transforms.reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.reroute.topic.regex=(.*).schema.table(.*)
transforms.reroute.topic.replacement=$1
snapshot.mode=configuration_based
snapshot.include.collection.list =dbname.schema.table
schema.history.internal.store.only.captured.tables.ddl=true
snapshot.mode.configuration.based.snapshot.schema=true
snapshot.mode.configuration.based.start.stream=true
tombstones.on.delete=false
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=table,lsn
transforms.removeWeirdChars.type=transforms.RemoveWeirdCharacters
I moved my jar file to the location with plugins for Kafka Connect.
What I'm generally trying to do is add a transformation that will remove whitespace at the beginning of message because then messages are presented incorrectly in elastic.
Do you have an idea what's wrong?
I had the wrong version of java. It helped when I pasted my custom jar into the libs directory. This helped me debug because kafka-connect started returning errors.