apache-zookeepergoogle-cloud-pubsubapache-kafka-connecteofexception

PubSub Kafka Connect node connection end of file exception


While running PubSub Kafka connect using the command:

.\bin\windows\connect-standalone.bat .\etc\kafka\WorkerConfig.properties
 .\etc\kafka\configSink.properties .\etc\kafka\configSource.properties

I get this error:

Sending metadata request {topics=[test]} to node -1
could not scan file META-INF/MANIFEST.MF in url file:/C:/confluent-3.3.0/bin/../share/java/kafka-serde-tools/commons-compress-1.8.1.jar with scanner SubTypesScanner
could not scan file META-INF/MANIFEST.MF in url file:/C:/confluent-3.3.0/bin/../share/java/kafka-serde-tools/commons-compress-1.8.1.jar with scanner TypeAnnotationsScanner
could not scan file META-INF/LICENSE.txt in url file:/C:/confluent-3.3.0/bin/../share/java/kafka-serde-tools/commons-compress-1.8.1.jar with scanner SubTypesScanner
could not scan file META-INF/LICENSE.txt in url file:/C:/confluent-3.3.0/bin/../share/java/kafka-serde-tools/commons-compress-1.8.1.jar with scanner TypeAnnotationsScanner
could not scan file META-INF/NOTICE.txt in url file:/C:/confluent-3.3.0/bin/../share/java/kafka-serde-tools/commons-compress-1.8.1.jar with scanner SubTypesScanner
could not scan file META-INF/NOTICE.txt in url file:/C:/confluent-3.3.0/bin/../share/java/kafka-serde-tools/commons-compress-1.8.1.jar with scanner TypeAnnotationsScanner
Connection with localhost/127.0.0.1 disconnected
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:181)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:229)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:975)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:316)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Node -1 disconnected.
Bootstrap broker localhost:2181 disconnected
Cancelled GROUP_COORDINATOR request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@643b72cb, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=connect-CPSConnectorSink}), createdTimeMs=1504105687600, sendTimeMs=1504105687724) with correlation id 0 due to node -1 being disconnected
Give up sending metadata request since no node is available

I am running a local kafka broker and local zookeeper. Any help on how to fix this issue would be great!

Property files here


Solution

  • So, after trying various fixes, I found out that I was using the wrong port number for localhost. The port 2181 was being used by Zookeeper while port 9092 was being used by Kafka Server. So, my original property in WorkerConfig.properties was:

    bootstrap.servers=localhost:2181
    

    When it should be

    bootstrap.server=localhost:9092
    

    So, moral of the story is to make sure you are using the correct host and port number of your Kafka servers in your connector properties file, mine being WorkerConfig. This is shown here. This may seem like common sense, but I changed the port number while trying to fix other errors, but unknowingly added another error. So, if you recieve the above error, check your bootstrap.server property.