javaapache-kafka

Why is callback in producer.send method not being executed during failure to send a message?


I am currently sending messages to kafka like so:

try {
    producer.send(record, new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
            System.out.println(metadata);
            System.out.println(e);
            if (e != null) {
                System.out.println(e);
                try {
                    throw(e);
                } catch (InvalidTopicException er) {
                    logger.error("Unrecoverable error: ", er);
                    System.exit(-1);
                } //...otherfatalexceptions...
                catch (UnknownServerException er) {
                    logger.error("Unrecoverable error: ", er);
                    System.exit(-1);
                } catch (Exception er) {
                    logger.error("Failed to send message to topic:" + topic, er);
                }
            }
        }
    });
} catch (Exception e) {
    logger.error("Failed to send message to topic:" + topic, e);
}

i am using producer.send and not waiting for the record metadata to be returned before sending the next record in a queue since it would take to long. Instead i am using onCallback method to handle that data.

The idea behind the code is to terminate the program in case i get an unrecoverable error.

Producer is being initialized like so:

producer = new KafkaProducer<String, String>(properties);

with following properties

Properties mainKafkaProperties = new Properties();
mainKafkaProperties.put("bootstrap.servers", kafkaUrl);
mainKafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
mainKafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
mainKafkaProperties.put("acks", "all");

When messages are being sent properly, System.out.println(metadata); line is properly printing metadata showing that messages are being sent properly, but when i delete the topic to simulate the InvalidTopicException instead of executing the code in the catch method i only get the following warning: [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 97 : {em_om_messages_local=UNKNOWN_TOPIC_OR_PARTITION}

Kafka library used is: org.apache.kafka.clients I am running the program on eclipse debugger

I am getting the same error regardless how many partitions the topic has Java version 17.0.6+10 Spring is not used


Solution

  • Check javadoc

    InvalidTopicException

    For example the topic name is too long, contains invalid characters etc.

    If you're sending a valid topic name, this isn't thrown, and you'll get that warning log instead, which isn't an exception that can be caught.

    Also, there's no reason to throw(e). Use instanceof to check the type of the Exception in the callback