I'm working with a Kafka Streams application where we use dynamic topic determination based on message headers. In our setup, it's normal for topics to be deleted while the application is running. Messages for a deleted topic might still occasionally arrive, but I want to simply ignore them. However, even after receiving just one message for a non-existent topic, I encounter an infinite loop of errors:
[kafka-producer-network-thread | stream-example-producer] WARN org.apache.kafka.clients.NetworkClient -- [Producer clientId=stream-example-producer] Error while fetching metadata with correlation id 74 : {test1=UNKNOWN_TOPIC_OR_PARTITION}
org.apache.kafka.common.errors.TimeoutException: Topic test1 not present in metadata after 60000 ms.
[kafka-producer-network-thread | stream-example-producer] WARN org.apache.kafka.clients.NetworkClient -- [Producer clientId=stream-example-producer] Error while fetching metadata with correlation id 79 : {test1=UNKNOWN_TOPIC_OR_PARTITION}
This infinite loop of errors essentially causes the application to stop working. How can I configure my Kafka Streams application to ignore messages for deleted topics without entering an infinite loop of errors? Is there a way to handle this situation? Here's a simplified example of my application code:
StreamsBuilder builder = new StreamsBuilder();
List<String> dynamicTopics = List.of("good_topic", "deleted_topic");
builder.stream("source_topic").to((k, v, c) -> dynamicTopics.get(new Random().nextInt(dynamicTopics.size()))); //in real application from header
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Automatic topic creation is disabled.
I tried the following to handle and ignore the error:
Use KafkaAdmin: However, between checks for existing topics, a topic can be deleted, which doesn't solve the issue.
Set UncaughtExceptionHandler:
streams.setUncaughtExceptionHandler(new StreamsUncaughtExceptionHandler() {
@Override
public StreamThreadExceptionResponse handle(Throwable throwable) {
return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
}
});
But the code doesn't even reach this handler.
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class.getName());
Again, the code doesn't reach this handler.
props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), ErrorInterceptor.class.getName());
The code reaches this interceptor, but I'm unable to resolve the issue from here.
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, "5000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG), "8000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "0");
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), "10000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), "10000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 0);
I tried adjusting these producer properties, but Kafka Streams still attempts to handle the error indefinitely
There is currently no way to do what you want. I did dig into the code with a colleague (thanks Andrew!) and the producer returns a TimeoutException
for this case, what is a RetriableException
and thus KafkaStreams does not invoke the production exception handler (the only place where you could swallow the error) but, well, retries. For the general case, this behavior makes sense (KafkaStreams tries to handle as many errors as possible internally), but for your scenario, you have a somewhat "weird" corner case at hand and the pattern breaks.
It's a somewhat odd corner case that the producer returns a retriable exception here; missing metadata is retriable in most cases so it's not totally wrong, but with a non-existing topic it's not correct all the time (the problem is really, that the producer cannot distinguish both cases...)