I am using spring retry (spring-kafka.version 2.9.13) to implement a retry mechanism for kafka consumer. My aim is to use custom retry topic (or topics) that are not auto created by spring-kafka. Below is the code snippet for a single topic:
@RetryableTopic(attempts = "3", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC, include = {Exception.class})
@KafkaListener(topics = "${kafka-config.consumer.core-topic}", groupId ="${kafka-config.consumer.group-id}", containerFactory = "kafkaConcurrentListenerContainerFactory")*
public void receiveMessage(MyPOJO<Object1, Object2> coreKafkaMessage, Acknowledgment ack) throws IOException {
log.info("testing retry for kafka", coreKafkaMessage);
throw new Exception("testing retry");
}
I am extending the RetryTopicConfigurationSupport in my Configuration class and followed the steps for custom Topic naming strategy as mentiond in the answer for spring-retry
However, when I run my springboot project it fails with below exception:
Caused by: java.lang.IllegalStateException: A single KafkaTemplate bean could not be found in the context; a single instance must exist, or one specifically named defaultRetryTopicKafkaTemplate at org.springframework.util.Assert.state(Assert.java:97) at org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor.getKafkaTemplate(RetryableTopicAnnotationProcessor.java:220) at org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor.processAnnotation(RetryableTopicAnnotationProcessor.java:149) at org.springframework.kafka.annotation.RetryTopicConfigurationProvider.findRetryConfigurationFor(RetryTopicConfigurationProvider.java:94) at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:509) at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:488) at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:389) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:455) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1808) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620) ... 20 common frames omitted
Am I missing something ?
Thank you!
See documentation: https://docs.spring.io/spring-kafka/reference/retrytopic/retry-config.html#using-the-retryabletopic-annotation
If you don’t specify a
kafkaTemplate
name a bean with namedefaultRetryTopicKafkaTemplate
will be looked up. If no bean is found an exception is thrown.
Spring Boot auto-configures for us a KafkaTemplate
bean with kafkaTemplate
name.
So, if you don't create defaultRetryTopicKafkaTemplate
bean, then you need to specify it explicitly according to Spring Boot auto-configuration:
@RetryableTopic(kafkaTemplate = "kafkaTemplate")