apache-kafkakafka-consumer-apikafka-producer-api

Unexpected behaviour of NotEnoughReplicasException with min.insync.replicas


This is a continuation of my previous question

I was exploring kafka's min.insync.replicas and here's the summary:

  1. Setup 3 brokers in local, created a topic insync with min.insync.replicas=2.
  2. Messages were produced by kafka-console-producer with acks=all and read by kafka-console-consumer
  3. Bought down 2 brokers leaving just 1 insync.replicas and was expecting an exception in producer as mentioned here and here

But it never happened and producer was producing messages and consumer was reading them from console without any errors.(more details in previous question)

Then, Instead of producing messages from console-producer , I wrote a java producer with same configurations as console producer and finally got the following exception.

ERROR [Replica Manager on Broker 0]: Error processing append operation on partition insync-0 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync replicas for partition [insync,0] is 1 , below required minimum 2

Although I expected it from the producer(java code), it showed up in the kafka broker.

Console producer command

 ./kafka-console-producer.sh --broker-list localhost:9092 --topic insync --producer.config ../config/producer.properties

kafka-console-producer properties:

bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
compression.type=none
batch.size=20
acks=all

Java producer code:

public static void main(String[] args) {
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(producerConfigs());

        try {

            int count = 0;
            while (true) {
                ProducerRecord<String, String> record = new ProducerRecord<String, String>("insync",
                        "test message: " + count);
                kafkaProducer.send(record);

                Thread.sleep(3000);
                count++;
            }
        } catch (Exception e) {

            e.printStackTrace();
        } finally {
            kafkaProducer.close();
        }
    }

    private static Properties producerConfigs() {
        Properties properties = new Properties();

        properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        properties.put("acks", "all");

        return properties;
    }

This brings me more questions.

  1. Why does it happen while running java producer and not in console-producer.?
  2. Why does the exception occur in broker and not in producer(java code)? the documentation for min.insync.replicas says

If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend)

How does kafka guarantee reliability in this case?


Solution

  • When producing with acks=all to a topic that has less in-sync replicas than min.insync.replicas, the producer should get NotEnoughReplicas.

    The reason why you are not seeing this behavior is because you have issues with both the console producer command and the Java code.

    1. Console Producer

    To enable acks=all in the kafka-console-producer.sh, you need to specify the --request-required-acks flag:

    ./kafka-console-producer.sh --broker-list localhost:9092 \
        --topic insync --request-required-acks all
    

    This is because the --request-required-acks flag takes precedence over the values specified via --producer.config and it defaults to 1.

    2. Java code

    The code you have pasted should be failing to send any messages but with the current logic you should only get WARN log messages like:

    Got error produce response with correlation id 15 on topic-partition , retrying ...

    To get notified in your code, you need to check the result of send(), via either checking the Future it returns or passing a Callback as the 2nd argument. Not also that NotEnoughReplicasException is a retriable exception so with the latest client, by default, it will retry forever instead of notifying the calling code.

    For example:

    Properties configs = new Properties();
    configs.put("bootstrap.servers", "localhost:9092");
    configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    configs.put("retries", "5");
    configs.put("acks", "all");
    
    try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "value");
        producer.send(record, (RecordMetadata metadata, Exception exc) -> {
            if (exc != null) {
                System.err.println(exc);
            }
        });
    }
    

    When the topic is below minimun ISR, the producer will retry 5 times before failing the record. It will then call the lambda with the exception, so you'll get:

    org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.


    So to conclude, min.insync.replicas is handled correctly but you need to be careful to pass the correct arguments to the tool, handle exceptions correctly in Java logic.