I have setup a Kafka v3.8.1 cluster with KRaft mode with a separate controller and broker.
Initially, I have 3 controllers and 6 large-size brokers (let's call them broker-04
, broker-05
, ..., broker-09
with the last digits as the node/broker ID).
Then I added 3 more smaller brokers (let's call them broker-10
, broker-11
, broker-12
) and decommissioned the old 6 large-size brokers.
# kafka-metadata-quorum --bootstrap-server $(hostname -i):9092 describe --status
ClusterId: mt-kafka-v3
LeaderId: 1
LeaderEpoch: 144
HighWatermark: 282139
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [1,2,3]
CurrentObservers: [10,11,12]
Now, whenever I create a new topic, Kafka may also assign some partitions to the old dead brokers (i.e: broker 4-9). However, I would expect Kafka to assign the partitions to only running brokers (i.e: broker 10-12).
What was happening here? Could we completely deregister the old brokers from the cluster?
UPDATE:
Here is the metadata snapshot. I could see the old brokers (4-9) were there but fenced=true
.
$ ./kafka-metadata-shell.sh --snapshot <redacted>.checkpoint
>> tree image/cluster/
brokers:
10:
BrokerRegistration(id=10, epoch=339668, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=false, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
11:
BrokerRegistration(id=11, epoch=341484, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=false, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
12:
BrokerRegistration(id=12, epoch=342419, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=false, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
4:
BrokerRegistration(id=4, epoch=182851, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=true, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
5:
BrokerRegistration(id=5, epoch=183308, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=true, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
6:
BrokerRegistration(id=6, epoch=183750, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=true, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
7:
BrokerRegistration(id=7, epoch=184159, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=true, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
8:
BrokerRegistration(id=8, epoch=184724, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=true, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
9:
BrokerRegistration(id=9, epoch=185145, incarnationId=<redacted>, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20}, rack=Optional[<redacted>], fenced=true, inControlledShutdown=false, isMigratingZkBroker=false, directories=[<redacted>])
controllers:
1:
ControllerRegistration(id=1, incarnationId=<redacted>, zkMigrationReady=false, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20})
2:
ControllerRegistration(id=2, incarnationId=<redacted>, zkMigrationReady=false, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20})
3:
ControllerRegistration(id=3, incarnationId=<redacted>, zkMigrationReady=false, listeners=[<redacted>], supportedFeatures={metadata.version: 1-20})
In KRaft mode, in order to decommission a broker you have to explicitly removed it from the cluster.
To do so you can either use:
kafka-cluster.sh
tool with the unregister
optionunregisterBroker()
method from the Admin
APIOtherwise if you just shut it down, the broker stays in the KRaft metadata (as fenced) and is still eligible for replica assignment.