spring-bootapache-kafkaspring-kafkaspring-retry

How to edit message data before sending to DLT in Spring-Kafka


I read a lot of documentation and a lot of issues on github of spring-kafka and still didn't get how to make spring DeadLetterRecovererFactory use my custom DeadLetterRecoverer.

Long story short - I want to modify the record before sending it do DLT.(But only before sending to DLT, not between retries).

In documentation of spring-kafka DeadLetterPublishingRecoverer is said - "createProducerRecord(org.apache.kafka.clients.consumer.ConsumerRecord record, org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.common.header.internals.RecordHeaders headers) Subclasses can override this method to customize the producer record to send to the DLQ."

So, here I'm creating my custom recoverer. enter image description here

And injecting it into DefaultErrorHandler enter image description here.

But this solution never works. The method createProducerRecord from MyDeadLetterRecoverer wasn't never called. Have no idea how to make this works.

But also later I found interesting comment

Did I missed something or is it Spring really don't allow DeadLetterRecoveryFactory to create an instances of subclasses? Why then they wrote this in their documentation?

Also I tried to implement ProducerInterceptor but this not exactly what I want because in this case I'm losing the spring retryable logic


Solution

  • It's not ideal, but you can do it...

    1. Extend DeadLetterPublishingRecovererFactory and override create() to create the custom DLPR.
    2. Extend RetryTopicComponentFactory and override deadLetterPublishingRecovererFactory() to return the factory
    3. Extend RetryTopicConfigurationSupport and override createComponentFactory() to return the component factory.

    Unfortunately, you will need to copy much of the code in super.create() (and capture the field values passed in to the setters).

    I opened an issue to make this easier: https://github.com/spring-projects/spring-kafka/issues/2711