I have written a KafkaConsumer
. The configuration looks like this:
@Bean
Map<String, Object> consumerConfig(
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers) {
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest",
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
false,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
}
When I assign a topic that does not exist to the KafkaConsumer
, no error is thrown. This is the code:
var topicPartition = new TopicPartition("75757584959595943", key);
var partitions = Set.of(topicPartition);
consumer.assign(partitions);
for (var records = consumer.poll(Duration.ZERO); !records.isEmpty(); ) {
// ...
Why does the KafkaConsumer
not alert me about a non-existing topic? Wouldn't this be helpful?
It does alert; in the logs you'll see info messages including UNKNOWN_TOPIC_OR_PARTITION
It's not a fatal exception. The consumer will continue to fetch cluster metadata and wait until the topic exists, and then poll it when it does.
If you'd like to have spring create the topic, make a NewTopic bean for it.
If you'd like to check for topic existence, and throw your own exception, use an AdminClient and the describeTopics method