I have Kafka with disabled topic auto creation. All topics must be created with external API. And I want to implement topic auto creation =)
try { producer.send(...) } catch (UnknownTopicException e) { ... external API call ... }
By default, KafkaProducer
do not throw exception on send to unknown topic. Only logs endlessly UNKNOWN_TOPIC_OR_PARTITION
errors:
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {unknown_topic=UNKNOWN_TOPIC_OR_PARTITION}
I fix this with decreasing max.block.ms
configuration. Now KafkaProducer
don't retry before thrown the exception:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) {
var properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty("max.block.ms", "0");
try (var producer = new KafkaProducer<String, String>(properties)) {
System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("unknown_topic", "hello world")).get());
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
This doesn't seem to be the best solution. What other options are there?
The best option to check if the topic exists is to call AdminClient
before any call.
It has this method: describeTopics
. That method throws the UNKNOWN_TOPIC_OR_PARTITION
exception, so the way to go is to catch it.
You could just modify your try block:
//...
try (var adminClient = AdminClient.create(properties);
var producer = new KafkaProducer<String, String>(properties))
{
if (topicExists(adminClient, "yourTopic"))
{
System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("yourTopic", "hi!")).get());
//...
}
else //well, it doesn't exist as we catched the exception and returned false
{
//your non-topic logic here.
}
//...
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
//...
private boolean topicExists(AdminClient adminClient, String topic)
{
try
{
DescribeTopicsResult result = adminClient.describeTopics(java.util.List.of(topic));
result.all().get();
return true; //OK, it exists
}
catch (ExecutionException e)
{
if (e.getCause() instanceof UnknownTopicOrPartitionException)
return false; //catch the false -- UnknownTopicOrPartition means it doesnt exist
else
throw new RuntimeException(e);
}
}