spring-bootapache-kafkaspring-kafka

Send kafka retryable message to a fixed topic using Spring Retry


I am using spring retry (spring-kafka.version 2.9.13) to implement a retry mechanism for kafka consumer. My aim is to use custom retry topic (or topics) that are not auto created by spring-kafka. Below is the code snippet for a single topic:

@RetryableTopic(attempts = "3", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC, include = {Exception.class})
@KafkaListener(topics = "${kafka-config.consumer.core-topic}", groupId ="${kafka-config.consumer.group-id}", containerFactory = "kafkaConcurrentListenerContainerFactory")*  
public void receiveMessage(MyPOJO<Object1, Object2> coreKafkaMessage, Acknowledgment ack) throws IOException {
            log.info("testing retry for kafka", coreKafkaMessage);
            throw new Exception("testing retry");
}

I am extending the RetryTopicConfigurationSupport in my Configuration class and followed the steps for custom Topic naming strategy as mentiond in the answer for spring-retry

However, when I run my springboot project it fails with below exception:

Caused by: java.lang.IllegalStateException: A single KafkaTemplate bean could not be found in the context; a single instance must exist, or one specifically named defaultRetryTopicKafkaTemplate at org.springframework.util.Assert.state(Assert.java:97) at org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor.getKafkaTemplate(RetryableTopicAnnotationProcessor.java:220) at org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor.processAnnotation(RetryableTopicAnnotationProcessor.java:149) at org.springframework.kafka.annotation.RetryTopicConfigurationProvider.findRetryConfigurationFor(RetryTopicConfigurationProvider.java:94) at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:509) at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:488) at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:389) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:455) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1808) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620) ... 20 common frames omitted

Am I missing something ?

Thank you!


Solution

  • See documentation: https://docs.spring.io/spring-kafka/reference/retrytopic/retry-config.html#using-the-retryabletopic-annotation

    If you don’t specify a kafkaTemplate name a bean with name defaultRetryTopicKafkaTemplate will be looked up. If no bean is found an exception is thrown.

    Spring Boot auto-configures for us a KafkaTemplate bean with kafkaTemplate name.

    So, if you don't create defaultRetryTopicKafkaTemplate bean, then you need to specify it explicitly according to Spring Boot auto-configuration:

    @RetryableTopic(kafkaTemplate = "kafkaTemplate")