This is a continuation of my previous question
I was exploring kafka's min.insync.replicas
and here's the summary:
insync
with min.insync.replicas=2
.acks=all
and read by kafka-console-consumerinsync.replicas
and was expecting an exception in producer as mentioned here and hereBut it never happened and producer was producing messages and consumer was reading them from console without any errors.(more details in previous question)
Then, Instead of producing messages from console-producer , I wrote a java producer with same configurations as console producer and finally got the following exception.
ERROR [Replica Manager on Broker 0]: Error processing append operation on partition insync-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync replicas for partition [insync,0] is 1 , below required minimum 2
Although I expected it from the producer(java code), it showed up in the kafka broker.
Console producer command
./kafka-console-producer.sh --broker-list localhost:9092 --topic insync --producer.config ../config/producer.properties
kafka-console-producer properties:
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
compression.type=none
batch.size=20
acks=all
Java producer code:
public static void main(String[] args) {
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(producerConfigs());
try {
int count = 0;
while (true) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("insync",
"test message: " + count);
kafkaProducer.send(record);
Thread.sleep(3000);
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaProducer.close();
}
}
private static Properties producerConfigs() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "all");
return properties;
}
This brings me more questions.
min.insync.replicas
saysIf this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend)
How does kafka guarantee reliability in this case?
When producing with acks=all
to a topic that has less in-sync replicas than min.insync.replicas
, the producer should get NotEnoughReplicas
.
The reason why you are not seeing this behavior is because you have issues with both the console producer command and the Java code.
1. Console Producer
To enable acks=all
in the kafka-console-producer.sh
, you need to specify the --request-required-acks
flag:
./kafka-console-producer.sh --broker-list localhost:9092 \
--topic insync --request-required-acks all
This is because the --request-required-acks
flag takes precedence over the values specified via --producer.config
and it defaults to 1
.
2. Java code
The code you have pasted should be failing to send any messages but with the current logic you should only get WARN
log messages like:
Got error produce response with correlation id 15 on topic-partition , retrying ...
To get notified in your code, you need to check the result of send()
, via either checking the Future
it returns or passing a Callback
as the 2nd argument. Not also that NotEnoughReplicasException
is a retriable exception so with the latest client, by default, it will retry forever instead of notifying the calling code.
For example:
Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("retries", "5");
configs.put("acks", "all");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "value");
producer.send(record, (RecordMetadata metadata, Exception exc) -> {
if (exc != null) {
System.err.println(exc);
}
});
}
When the topic is below minimun ISR, the producer will retry 5 times before failing the record. It will then call the lambda with the exception, so you'll get:
org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
So to conclude, min.insync.replicas
is handled correctly but you need to be careful to pass the correct arguments to the tool, handle exceptions correctly in Java logic.