apache-kafka

Kafka auto assigns new topics/partitions to dead/fenced brokers


I have setup a Kafka v3.8.1 cluster with KRaft mode with a separate controller and broker. Initially, I have 3 controllers and 6 large-size brokers (let's call them broker-04, broker-05, ..., broker-09 with the last digits as the node/broker ID). Then I added 3 more smaller brokers (let's call them broker-10, broker-11, broker-12) and decommissioned the old 6 large-size brokers.

# kafka-metadata-quorum --bootstrap-server $(hostname -i):9092 describe --status
ClusterId:              mt-kafka-v3
LeaderId:               1
LeaderEpoch:            144
HighWatermark:          282139
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   0
CurrentVoters:          [1,2,3]
CurrentObservers:       [10,11,12]

Now, whenever I create a new topic, Kafka may also assign some partitions to the old dead brokers (i.e: broker 4-9). However, I would expect Kafka to assign the partitions to only running brokers (i.e: broker 10-12).

Kafka assigned a few partitions to dead brokers

What was happening here? Could we completely deregister the old brokers from the cluster?

UPDATE:

Here is the metadata snapshot. I could see the old brokers (4-9) were there but fenced=true.

$ ./kafka-metadata-shell.sh --snapshot <redacted>.checkpoint
>> tree image/cluster/
brokers:
  10:
    BrokerRegistration(id=10, epoch=339668, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=false, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
  11:
    BrokerRegistration(id=11, epoch=341484, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=false, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
  12:
    BrokerRegistration(id=12, epoch=342419, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=false, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
  4:
    BrokerRegistration(id=4, epoch=182851, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=true, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
  5:
    BrokerRegistration(id=5, epoch=183308, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=true, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
  6:
    BrokerRegistration(id=6, epoch=183750, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=true, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
  7:
    BrokerRegistration(id=7, epoch=184159, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=true, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
  8:
    BrokerRegistration(id=8, epoch=184724, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=true, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
  9:
    BrokerRegistration(id=9, epoch=185145, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=true, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
controllers:
  1:
    ControllerRegistration(id=1, incarnationId=<redacted>, zkMigrationReady=false, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20})
  2:
    ControllerRegistration(id=2, incarnationId=<redacted>, zkMigrationReady=false, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20})
  3:
    ControllerRegistration(id=3, incarnationId=<redacted>, zkMigrationReady=false, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20})

Solution

  • In KRaft mode, in order to decommission a broker you have to explicitly removed it from the cluster.

    To do so you can either use:

    Otherwise if you just shut it down, the broker stays in the KRaft metadata (as fenced) and is still eligible for replica assignment.