spring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafkakafka-dlt

Spring cloud stream handling poison pills with Kafka DLT


I'm following this recipe to handle deserialisation errors: https://github.com/spring-cloud/spring-cloud-stream-samples/blob/main/recipes/recipe-3-handling-deserialization-errors-dlq-kafka.adoc

I created the beans mentioned in the recipe above as:

Configuration
@Slf4j
public class ErrorHandlingConfig {

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
        return (container, dest, group) -> {
            container.setErrorHandler(errorHandler);
        };
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
    }

    @Bean
    public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
        return new DeadLetterPublishingRecoverer(bytesTemplate);
    }
}

configuration file:

spring:
  cloud:
    stream:
      default:
        producer:
          useNativeEncoding: true
        consumer:
          useNativeDecoding: true
      bindings:
        myInboundRoute:
          destination: some-destination.1
          group: a-custom-group
        myOutboundRoute:
          destination: some-destination.2
      kafka:
        binder:
          brokers: localhost
          defaultBrokerPort: 9092
          configuration:
            application:
              security: PLAINTEXT
        bindings:
          myInboundRoute:
            consumer:
              autoCommitOffset: true
              startOffset: latest
              enableDlq: true
              dlqName: my-dql.poison
              dlqProducerProperties:
                configuration:
                  value.serializer: myapp.serde.MyCustomSerializer
              configuration:
                  value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                  spring.deserializer.value.delegate.class: myapp.serde.MyCustomSerializer
          myOutboundRoute:
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: myapp.serde.MyCustomSerializer

I was expecting the DLT to be called my-dql.poison. This topic is in fact created fine, however I also get a second topic auto created called some-destination.1.DLT Why does it create this as well as the one I have named in the config with dlqName ?

What am I doing wrong? When I poll for messages, the message is in the auto created some-destination.1.DLT and not my dlqName


Solution

    1. You should not configure dlt processing in the binding if you configure the STCEH in the container. Also set maxAttempts=1 to disable retries there.

    2. You need to configure a destination resolver in the DLPR to use a different name.

        /**
         * Create an instance with the provided template and destination resolving function,
         * that receives the failed consumer record and the exception and returns a
         * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
         * 0, no partition is set when publishing to the topic.
         * @param template the {@link KafkaOperations} to use for publishing.
         * @param destinationResolver the resolving function.
         */
        public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends Object> template,
                BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
            this(Collections.singletonMap(Object.class, template), destinationResolver);
        }
    

    See https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

    There is an open issue to configure the DLPR with the binding's DLT name.

    https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1031