apache-kafkakafka-consumer-apispring-kafkadead-letterkafka-dlt

Kafka message are sent to single topic instead of n retry topic and dlt


I am trying to implement n-retry topic with DLT but all the messages are being pushed to a single topic test-topic-retry-0, there are 3 duplicate records in test-topic-retry-0 which are supposed to be like this:

It seems like Kafka is pushing all the messages to same topic.

Kafka configuration:

@Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object>
  kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.setBatchListener(false);
    factory.setConcurrency(1);
    factory.getContainerProperties().setAckMode(AckMode.RECORD);

    return factory;
  }
private Map<String, Object> consumerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
  props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
  props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
  props.put(MAX_POLL_RECORDS_CONFIG, 100);
  props.put(HEARTBEAT_INTERVAL_MS_CONFIG, 2000);
  props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
  return props;
}
@Bean
public KafkaAdmin kadmin() {
  Map<String, Object> configs = new HashMap<>();
  configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
    return new KafkaAdmin(configs);
}

@Bean
public ProducerFactory<String, Object> producerFactory() {
  Map<String, Object> configProps = new HashMap<>();
  configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
  configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

  return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> prodTemplate() {
  return new KafkaTemplate<>(producerFactoryString());
}

@Bean
public RetryTopicConfiguration retryTopicConfig(KafkaTemplate<String, Object> template,
    ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
      template.getProducerFactory().getConfigurationProperties());

  return RetryTopicConfigurationBuilder
      .newInstance()
      .exponentialBackoff(2000, 5, Long.MAX_VALUE)
      .maxAttempts(3)
      .timeoutAfter(-1)
      .autoCreateTopicsWith(3, (short) 3)
      .dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
      .setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
      .retryTopicSuffix("-retry")
      .dltSuffix("-dlt")
      .listenerFactory(factory)
      .create(template);
}

Listener:

@KafkaListener(topics=“test-topic”)
public void onMessage(ConsumerRecord<String, String> r) {
    throw new RuntimeException(“test”);
}

@DltHandler
public void handleDlt(ConsumerRecord<String, String> r) {
    log.error(“test dlt”);
}

Solution

  • I just copied your code pretty much exactly and it works as expected for me.

    However, I had to add the .dltHandlerMethod:

    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(2000, 5, Long.MAX_VALUE)
            .maxAttempts(3)
            .timeoutAfter(-1)
            .autoCreateTopicsWith(3, (short) 1)
            .dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
            .setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
            .retryTopicSuffix("-retry")
            .dltSuffix("-dlt")
            .listenerFactory(factory)
            .dltHandlerMethod(Listener.class, "handleDlt")
            .create(template);
    
    @Component
    class Listener {
    
        private static final Logger log = LoggerFactory.getLogger(Listener.class);
    
        @KafkaListener(id = "so69453282", topics = "test-topic")
        public void onMessage(ConsumerRecord<String, String> r) {
            log.info(r.topic());
            throw new RuntimeException("test");
        }
    
        @DltHandler
        public void handleDlt(ConsumerRecord<String, String> r) {
            log.error("test dlt");
        }
    
    }
    
    2021-10-05 13:03:12,451 [so69453282-0-C-1] test-topic
    2021-10-05 13:03:14,485 [so69453282-retry-0-0-C-1] test-topic-retry-0
    2021-10-05 13:03:24,523 [so69453282-retry-1-0-C-1] test-topic-retry-1
    2021-10-05 13:03:25,031 [so69453282-dlt-0-C-1] test dlt