spring-bootspring-kafkaspring-retry

Inject values into custom DeadLetterPublishingRecoverer


I have a custom dead letter recoverer which I've implemented so that I could override the createProducer method

protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
            TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) 

This is needed because my DLT needs a different schema from the source. I need to inject spring @Values from my application environment specific ( dev, review, test, prod) yamls to get values to create this new schema and produce to the DLT.

 import org.springframework.beans.factory.annotation.Value

public class MyDeadLetterPublishingRecoverer extends DeadLetterPublishingRecoverer
{
   @Value("${spring.kafka.custom.myValue}")
    private String myValue;

   public MyDeadLetterPublishingRecoverer (
    KafkaOperations<?, ?> template,
    BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
  super(template, destinationResolver);
}

}

However, myValue is always null because the DeadLetterPublishingRecoverer is not a component or similar stereotype so there is no spring context to enable the resolution of the variable. If one of the stereotypes is added to MyDeadLetterPublishingRecoverer then the constructor complains about needing a bean so that the desitination resolver can be autowired ( which I don't want anyway) "Could not autowire. No beans of 'BiFunction<ConsumerRecord, Exception, TopicPartition>' type found"

So, I'm looking for either a way to get the @value annotation to work in the custom DeadLetterPublishingRecoverer or an alternate way to pull the values from application.yml to use in the custom DeadLetterPublishingRecoverer


Solution

  • Just define the recoverer as a @Bean, or annotate it as a @Component, and Spring will take care of all the wiring.

    If you are using Spring Boot, just the presence of the bean is enough, Boot will wire it into the container factory (if you are using Boot's auto configured factory).

    If you are not using boot, you will have to set in on the container factory yourself.

    Correction: Boot will auto configure a CommonErrorHandler that contains the recoverer, not the recoverer itself.

    If you don't need a destination resolver, implement the simpler constructor instead (the one that just takes a KafkaOperations - template).