spring-bootapache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring Cloud Stream Kafka Binder - Retries not working when using DLQ in Batch Mode


I am using Spring Cloud Version 2023.0.1 (spring cloud stream version 4.1.1) and I have written a simple kafka consumer in batch mode to simulate an error scenario.

    @Bean
    Consumer<Message<List<String>>> consumer1() {
        return message -> {
            final List<String> payload = message.getPayload();
            final MessageHeaders messageHeaders = message.getHeaders();
            payload.forEach(System.out::println);
            payload.forEach(p -> {
                if(p.startsWith("a")) {
                    throw new RuntimeException("Intentional Exception");
                }
            });
            System.out.println(messageHeaders);
            System.out.println("Done");
        };
    }

My application.yml file looks like this

spring:
  cloud:
    function:
      definition: consumer1;
    stream:
      bindings:
        consumer1-in-0:
          destination: topic1
          group: consumer1-in-0-v0.1
          consumer:
            batch-mode: true
            use-native-decoding: true
            max-attempts: 3
      kafka:
        binder:
          brokers:
            - localhost:9092
        default:
          consumer:
            configuration:
              max.poll.records: 1000
              max.partition.fetch.bytes: 31457280
              fetch.max.wait.ms: 200
        bindings:
          consumer1-in-0:
            consumer:
              enableDlq: true
              dlqName: dlq-topic
              dlqProducerProperties:
                configuration:
                  value.serializer: org.apache.kafka.common.serialization.StringSerializer
                  key.serializer: org.apache.kafka.common.serialization.StringSerializer
              configuration:
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

I have also specified a ListenerContainerWithDlqAndRetryCustomizer to customize the retries

    @Bean
    ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
        return new ListenerContainerWithDlqAndRetryCustomizer() {

            @Override
            public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                                  String group,
                                  @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                                  @Nullable BackOff backOff) {

                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
                        dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
            }

            @Override
            public boolean retryAndDlqInBinding(String destinationName, String group) {
                return false;
            }

        };
    }

The issue When an error happens, the message batch goes straight to the DLQ. And no retries are attempted.

However the problem is that there can be transient errors because of which the batch failed processing, and I want the batch to be retried a few times before sending it to DLQ. But I am not able to get it to work.

What am I doing wrong?


Solution

  • In case anyone stumbles upon this in future, i figured out what was wrong. I had to remove enableDlq, dlqName and dlqProducerProperties from application.yml file.

    Then it worked.

    In the java code, I also removed ListenerContainerWithDlqAndRetryCustomizer and just used ListenerContainerCustomizer. The code looked something like this:

        @Bean
        public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
            return (container, dest, group) -> container.setCommonErrorHandler(errorHandler);
        }
    
        @Bean
        public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
            return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 4));
        }
    
        @Bean
        public DeadLetterPublishingRecoverer publisher(KafkaOperations<?, ?> stringTemplate,
                                                       KafkaOperations<?, ?> bytesTemplate,
                                                       KafkaOperations<?, ?> longTemplate) {
            Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
            templates.put(String.class, stringTemplate);
            templates.put(byte[].class, bytesTemplate);
            templates.put(Long.class, longTemplate);
            return new DeadLetterPublishingRecoverer(templates);
        }
    
        @Bean
        public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
            return new KafkaTemplate<>(pf,
                    Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
        }
    
        @Bean
        public KafkaTemplate<String, String> bytesTemplate(ProducerFactory<String, String> pf) {
            return new KafkaTemplate<>(pf);
        }
    
        @Bean
        public KafkaTemplate<String, Long> longTemplate(ProducerFactory<String, Long> pf) {
            return new KafkaTemplate<>(pf,
                    Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
        }