apache-kafkaapache-kafka-connectrebalancing

Kafka Connect: java.lang.IllegalStateException: No current assignment for partition


I'm running Kafka Connect on Kubernetes (8-16 nodes, autoscaled). I've defined a total of 44 connectors, one per Kafka topic (one partition per topic). Those topic are produced by Debezium / Postgresql. There are 3 Kafka nodes. Each connector has tasks.max set to 4. Most of my connector (but not every!) has one (always one) failed task, due to java.lang.IllegalStateException: No current assignment for partition -0.

Not a Kafka expert here, note it ;) I'm presuming that there are 3 Kafka nodes, so 3 workers are doing great, and 4th tasks has nothing to connect to so it fails. But why sometimes there are 4 tasks running just fine?

Also, I have quite often "Conflicting operation due to rebalancing" issue, which can occur for minutes, even hours. Recently I've deleted all pods and they restarted themselves, the problem disappeared, but that's not long term solution.

What's the tasks.max recommended value? Thanks in advance!

Exception:

java.lang.IllegalStateException: No current assignment for partition table-0
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1501)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:601)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:70)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:675)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:291)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:406)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:341)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:445)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748

Sink Connector config:

connector.class com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
autoUpdateSchemas   true
sanitizeTopics  true
autoCreateTables    true
topics  <topic-name>
tasks.max   3
schemaRegistryLocation  http://<ip>:8081
project <big-query-project>
maxWriteSize    10000
datasets    .*=<big-query-dataset>
task.class  com.wepay.kafka.connect.bigquery.BigQuerySinkTask
keyfile /credentials/<credentials-file>.json
name    <connector-name>
schemaRetriever com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
tableWriteWait  1000
bufferSize  100000

And it throws above exception java.lang.IllegalStateException: No current assignment for [...]


Solution

  • Value of property tasks.max depends on several factors. Most important one is particular connector. Particular Connector depends on its logic and value of tasks.max calculate number of Task, that will be created. ex. FileStreamSourceConnector always create 1 Task, so even if you pass value higher then 1 it will create only one. Same situation is with PostgresConnector it parallel to one.

    tasks.max value should also depends on other factors, like: Kafka Connect mode, How many instance of Kafka Connect you have, CPU of machines etc.

    How I understand you are using SourceConnector (PostgresConnector). Source Connectors don't poll data from the Kafka. Exception, that you have posted relates to some SinkConnector. If use are using SinkConnector your tasks.max shouldn't exceed number of partitions. If you start more Tasks than the number of partitions some will be idle (status is running, but they don't process data) and rebalancing can occur.