I have created a custom connector which extends JdbcSinkConnector like this
public class CustomJdbcSinkConnector extends JdbcSinkConnector {
@Override
public Class<? extends Task> taskClass() {
return CustomJdbcSinkTask.class;
}
@Override
public String version() {
//version of your custom connector
return "1.0.0";
}
}
public class CustomJdbcSinkTask extends JdbcSinkTask {
private static final Logger log = LoggerFactory.getLogger(CustomJdbcSinkTask.class);
ErrantRecordReporter reporter;
DatabaseDialect dialect;
JdbcSinkConfig config;
JdbcDbWriter writer;
int remainingRetries;
private Map<String, String> configProps;
@Override
public void put(Collection<SinkRecord> records) {
log.info("hello from custom put");
log.info("custom put method starting...");
JdbcSinkTask jdbcSinkTask = new JdbcSinkTask();
jdbcSinkTask.put(records);
}
}
and I have taken the jar build and added its path to plugin.path in connect-distributed.properties
Then started the kafka connect but this custom connector is not loading
The log shows,
[2023-05-30 14:54:01,585] INFO Loading plugin from: C:\Kafka\JDBC\custom-jdbc-sink-1.0.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:275)
[2023-05-30 14:54:01,613] INFO Registered loader: PluginClassLoader{pluginLocation=file:/C:/Kafka/JDBC/custom-jdbc-sink-1.0.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:297)
It is loaded and registered but the plugin is not added.The jar file is not corrupted, I can run it from the terminal.
Custom connector was able to load by using maven-assembly-plugin in pom.xml like
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>com.example.somedir.MainApplication</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>