I am currently sending messages to kafka like so:
try {
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
System.out.println(metadata);
System.out.println(e);
if (e != null) {
System.out.println(e);
try {
throw(e);
} catch (InvalidTopicException er) {
logger.error("Unrecoverable error: ", er);
System.exit(-1);
} //...otherfatalexceptions...
catch (UnknownServerException er) {
logger.error("Unrecoverable error: ", er);
System.exit(-1);
} catch (Exception er) {
logger.error("Failed to send message to topic:" + topic, er);
}
}
}
});
} catch (Exception e) {
logger.error("Failed to send message to topic:" + topic, e);
}
i am using producer.send and not waiting for the record metadata to be returned before sending the next record in a queue since it would take to long.
Instead i am using onCallback
method to handle that data.
The idea behind the code is to terminate the program in case i get an unrecoverable error.
Producer is being initialized like so:
producer = new KafkaProducer<String, String>(properties);
with following properties
Properties mainKafkaProperties = new Properties();
mainKafkaProperties.put("bootstrap.servers", kafkaUrl);
mainKafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
mainKafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
mainKafkaProperties.put("acks", "all");
When messages are being sent properly, System.out.println(metadata); line is properly printing metadata showing that messages are being sent properly,
but when i delete the topic to simulate the InvalidTopicException
instead of executing the code in the catch method i only get the following warning:
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 97 : {em_om_messages_local=UNKNOWN_TOPIC_OR_PARTITION}
Kafka library used is: org.apache.kafka.clients I am running the program on eclipse debugger
I am getting the same error regardless how many partitions the topic has Java version 17.0.6+10 Spring is not used
Check javadoc
InvalidTopicException
For example the topic name is too long, contains invalid characters etc.
If you're sending a valid topic name, this isn't thrown, and you'll get that warning log instead, which isn't an exception that can be caught.
Also, there's no reason to throw(e)
. Use instanceof
to check the type of the Exception in the callback