apache-kafkaapache-kafka-connect

NullPointerException in Kafka-Connect service logs on start of Kafka-Connect


I deployed my custom connector in kafka-connect and registered using following configuration:

{
   "name" : "rabbitmq-source-connector",
   "config" : {
    "connector.class" : "net.geli.kafkaconnect.rabbitmqsourceconnector.RabbitMQSourceConnector",
    "tasks.max": "5",
    "kafka.topic" : "kafka-test",
    "rabbitmq.queue": "rabbitmq-test-queue",
    "rabbitmq.prefetch.count": 100,
    "rabbitmq.host": "localhost",
    "rabbitmq.virtual.host": "/",
    "rabbitmq.port": "5672",
    "rabbitmq.username": "guest",
    "rabbitmq.password": "guest",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://kafka-schema-registry-cp-schema-registry.kafka:8081",
    "value.converter.schemas.enable": true,
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"
}

It was working fine, but when we restart kafka-connect service getting below NullPointerException, I tried restarting kafka-broker but same error.

2021-10-08 06:07:13,122 ERROR [Worker clientId=connect-1, groupId=connect-cluster] Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1]
java.lang.NullPointerException
    at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:685)
    at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481)
    at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264)
    at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:287)
    at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:154)
    at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:265)
    at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:123)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:277)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2021-10-08 06:07:13,123 INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect) [Thread-10]
2021-10-08 06:07:13,125 INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer) [Thread-10]
2021-10-08 06:07:13,130 INFO Stopped http_8083@39c11e6c{HTTP/1.1,[http/1.1]}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector) [Thread-10]
2021-10-08 06:07:13,130 INFO node0 Stopped scavenging (org.eclipse.jetty.server.session) [Thread-10]
2021-10-08 06:07:13,131 INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer) [Thread-10]
2021-10-08 06:07:13,131 INFO [Worker clientId=connect-1, groupId=connect-cluster] Herder stopping (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Thread-10]
2021-10-08 06:07:18,132 INFO [Worker clientId=connect-1, groupId=connect-cluster] Herder stopped (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Thread-10]
2021-10-08 06:07:18,132 INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect) [Thread-10]

Kafka version: kafka_2.12-2.6.0

I am new to Kafka. I did google but no luck. Could you please help me out how to come out from this error?


Solution

  • The above issue is fixed after cleaning up connect-cluster-configs, connect-cluster-status, connect-cluster-offsets topics using following command in Kafka broker.

    ./kafka-topics.sh  --delete --topic connect-cluster-offsets --bootstrap-server=my-cluster-kafka-bootstrap.kafka:9092