javaspring-bootspring-kafkastatefuldead-letter

How to configure DLQ in a stateful application with Spring-Boot?


I need to create an application using stateful retry, that listen to a Kafka topic and make calls to some APIs and then commit the message. If in one of these calls an error occurs, for example a timeout, the application must retry 4 attempts with an interval of 4 seconds. At the end of these four attempts, if it still hasn't worked, the application should send it to a DLQ topic.

The part of sending to the DLQ topic that I'm not able to do. Because when I tried to configure the DLQ, the retry don't stop and not send to DLQ too.

@KafkaListener(topics = "${topic.name}", concurrency = "1")
public void listen(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem,
                    @Headers final MessageHeaders headers,
                    Acknowledgment ack) {
    AberturaContaLimiteCreditoCalculadoData dados;
    if (!validarMensagem(mensagem)) {
        dados = mensagem.value().getData();
        throw new RuntimeException();
        //ack.acknowledge();
        //This throw Runtime it's just to force it to retry.
    }
}

private boolean validarMensagem(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem) {
    return mensagem == null || mensagem.value() == null;
}

KafkaConfig:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(final ConsumerFactory<String, Object> consumerFactory) {
    final ConcurrentKafkaListenerContainerFactory<String, Object> factory
            = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    factory.setCommonErrorHandler(new DefaultErrorHandler(
            publisherRetryDLQ(),
            new FixedBackOff(4000L, 4L)));
    return factory;
}

public DeadLetterPublishingRecoverer publisherRetryDLQ() {
    return new DeadLetterPublishingRecoverer(createKafkaTemplate(),
            (record, ex) -> new TopicPartition(topicoDLQ, 0));
}

public ProducerFactory<String, String> producerFactory() {
    final Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
    return new DefaultKafkaProducerFactory<>(config);
}

public KafkaOperations<String, String> createKafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

Edit 2022-05-04:

We managed, from your tip with RetryListener and logging.level with Debug, to find the problem that was not building the Producer.

The problem now is that we receive a consumer with a different avro from the DLQ avro. The difference is that the DLQ has an extra field that must store the reason for the error.

2022/05/04 16:53:43.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [ERROR] o.s.k.l.DeadLetterPublishingRecoverer - Dead-letter publication to limites-abertura-conta-limite-credito-calculado-convivenciaaberturaconta-dlq failed for: limites-abertura-conta-limite-credito-calculado-0@6
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema{...}]}}]}

Is there a way to do this conversion?


Solution

  • If I understand the question properly, you want to create a ProducerRecord with a different value type.

    Simply subclass the DLPR and override createProducerRecord().

        /**
         * Subclasses can override this method to customize the producer record to send to the
         * DLQ. The default implementation simply copies the key and value from the consumer
         * record and adds the headers. The timestamp is not set (the original timestamp is in
         * one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
         * less than 0, it must be set to null in the {@link ProducerRecord}.
         * @param record the failed record
         * @param topicPartition the {@link TopicPartition} returned by the destination
         * resolver.
         * @param headers the headers - original record headers plus DLT headers.
         * @param key the key to use instead of the consumer record key.
         * @param value the value to use instead of the consumer record value.
         * @return the producer record to send.
         * @see KafkaHeaders
         */
        protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
                TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) {
    

    You can examine the headers to determine the exception that caused the failure. If you need the actual exception, override accept() to capture it in a ThreadLocal, then call super.accept(); you can then use the thread local in createProducerRecord().

    There are several solutions to publish the different type with the same producer factory.