I deployed my custom connector in kafka-connect and registered using following configuration:
{
"name" : "rabbitmq-source-connector",
"config" : {
"connector.class" : "net.geli.kafkaconnect.rabbitmqsourceconnector.RabbitMQSourceConnector",
"tasks.max": "5",
"kafka.topic" : "kafka-test",
"rabbitmq.queue": "rabbitmq-test-queue",
"rabbitmq.prefetch.count": 100,
"rabbitmq.host": "localhost",
"rabbitmq.virtual.host": "/",
"rabbitmq.port": "5672",
"rabbitmq.username": "guest",
"rabbitmq.password": "guest",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://kafka-schema-registry-cp-schema-registry.kafka:8081",
"value.converter.schemas.enable": true,
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
It was working fine, but when we restart kafka-connect service getting below NullPointerException, I tried restarting kafka-broker but same error.
2021-10-08 06:07:13,122 ERROR [Worker clientId=connect-1, groupId=connect-cluster] Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1]
java.lang.NullPointerException
at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:685)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481)
at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264)
at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:287)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:154)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:265)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:123)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:277)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2021-10-08 06:07:13,123 INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect) [Thread-10]
2021-10-08 06:07:13,125 INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer) [Thread-10]
2021-10-08 06:07:13,130 INFO Stopped http_8083@39c11e6c{HTTP/1.1,[http/1.1]}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector) [Thread-10]
2021-10-08 06:07:13,130 INFO node0 Stopped scavenging (org.eclipse.jetty.server.session) [Thread-10]
2021-10-08 06:07:13,131 INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer) [Thread-10]
2021-10-08 06:07:13,131 INFO [Worker clientId=connect-1, groupId=connect-cluster] Herder stopping (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Thread-10]
2021-10-08 06:07:18,132 INFO [Worker clientId=connect-1, groupId=connect-cluster] Herder stopped (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Thread-10]
2021-10-08 06:07:18,132 INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect) [Thread-10]
Kafka version: kafka_2.12-2.6.0
I am new to Kafka. I did google but no luck. Could you please help me out how to come out from this error?
The above issue is fixed after cleaning up connect-cluster-configs, connect-cluster-status, connect-cluster-offsets topics using following command in Kafka broker.
./kafka-topics.sh --delete --topic connect-cluster-offsets --bootstrap-server=my-cluster-kafka-bootstrap.kafka:9092