javaapache-kafka

KafkaProducer: How to raise and catch UNKNOWN_TOPIC_OR_PARTITION error?


I have Kafka with disabled topic auto creation. All topics must be created with external API. And I want to implement topic auto creation =)

try { producer.send(...) } catch (UnknownTopicException e) { ... external API call ... }

By default, KafkaProducer do not throw exception on send to unknown topic. Only logs endlessly UNKNOWN_TOPIC_OR_PARTITION errors:

[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {unknown_topic=UNKNOWN_TOPIC_OR_PARTITION}

I fix this with decreasing max.block.ms configuration. Now KafkaProducer don't retry before thrown the exception:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) {
        var properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty("max.block.ms", "0");

        try (var producer = new KafkaProducer<String, String>(properties)) {
            System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("unknown_topic", "hello world")).get());
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

This doesn't seem to be the best solution. What other options are there?


Solution

  • The best option to check if the topic exists is to call AdminClient before any call.

    It has this method: describeTopics. That method throws the UNKNOWN_TOPIC_OR_PARTITION exception, so the way to go is to catch it.

    https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/AdminClient.html#describeTopics-java.util.Collection-org.apache.kafka.clients.admin.DescribeTopicsOptions-

    You could just modify your try block:

    //...
    
        try (var adminClient = AdminClient.create(properties);
             var producer = new KafkaProducer<String, String>(properties)) 
        {
            
            if (topicExists(adminClient, "yourTopic")) 
            {
              System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("yourTopic", "hi!")).get());
            //...
            } 
             else //well, it doesn't exist as we catched the exception and returned false
            {
               //your non-topic logic here.
            }
        
          //...  
        } catch (ExecutionException | InterruptedException e) {
                 throw new RuntimeException(e);
        }
    //...
    
    private boolean topicExists(AdminClient adminClient, String topic) 
    {
       try 
       {
          DescribeTopicsResult result = adminClient.describeTopics(java.util.List.of(topic));
          result.all().get();
          return true; //OK, it exists
        }
        catch (ExecutionException e) 
        {
          if (e.getCause() instanceof UnknownTopicOrPartitionException) 
             return false; //catch the false -- UnknownTopicOrPartition means it doesnt exist
          else 
             throw new RuntimeException(e);
        } 
     }