I am trying to implement n-retry topic with DLT but all the messages are being pushed to a single topic test-topic-retry-0, there are 3 duplicate records in test-topic-retry-0 which are supposed to be like this:
It seems like Kafka is pushing all the messages to same topic.
Kafka configuration:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(false);
factory.setConcurrency(1);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
return factory;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(MAX_POLL_RECORDS_CONFIG, 100);
props.put(HEARTBEAT_INTERVAL_MS_CONFIG, 2000);
props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
return props;
}
@Bean
public KafkaAdmin kadmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> prodTemplate() {
return new KafkaTemplate<>(producerFactoryString());
}
@Bean
public RetryTopicConfiguration retryTopicConfig(KafkaTemplate<String, Object> template,
ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
template.getProducerFactory().getConfigurationProperties());
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(2000, 5, Long.MAX_VALUE)
.maxAttempts(3)
.timeoutAfter(-1)
.autoCreateTopicsWith(3, (short) 3)
.dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.retryTopicSuffix("-retry")
.dltSuffix("-dlt")
.listenerFactory(factory)
.create(template);
}
Listener:
@KafkaListener(topics=“test-topic”)
public void onMessage(ConsumerRecord<String, String> r) {
throw new RuntimeException(“test”);
}
@DltHandler
public void handleDlt(ConsumerRecord<String, String> r) {
log.error(“test dlt”);
}
I just copied your code pretty much exactly and it works as expected for me.
However, I had to add the .dltHandlerMethod
:
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(2000, 5, Long.MAX_VALUE)
.maxAttempts(3)
.timeoutAfter(-1)
.autoCreateTopicsWith(3, (short) 1)
.dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.retryTopicSuffix("-retry")
.dltSuffix("-dlt")
.listenerFactory(factory)
.dltHandlerMethod(Listener.class, "handleDlt")
.create(template);
@Component
class Listener {
private static final Logger log = LoggerFactory.getLogger(Listener.class);
@KafkaListener(id = "so69453282", topics = "test-topic")
public void onMessage(ConsumerRecord<String, String> r) {
log.info(r.topic());
throw new RuntimeException("test");
}
@DltHandler
public void handleDlt(ConsumerRecord<String, String> r) {
log.error("test dlt");
}
}
2021-10-05 13:03:12,451 [so69453282-0-C-1] test-topic
2021-10-05 13:03:14,485 [so69453282-retry-0-0-C-1] test-topic-retry-0
2021-10-05 13:03:24,523 [so69453282-retry-1-0-C-1] test-topic-retry-1
2021-10-05 13:03:25,031 [so69453282-dlt-0-C-1] test dlt