javaspringspring-kafkadead-letter

How to test extended kafkaListener with custom element?


My dev env : Java 8 spring-kafka - 2.6.1 spring-kafka-test- 2.6.7 spring-boot - 2.3.10

Rest API to send message

@PostMapping(
        value = {"publish"})
@ExternalApi
public ResponseEntity<Response> endpoint() {

    Response response= new Response();
    User user= new User();
    user.setName("userfirstname");
    LOG.info("in controller ->" + event);
    kafkaTemplate.send("user.topic","sendign String message");
    return new ResponseEntity<>(response,
                                HttpStatus.OK);
}

Consumer

@myListener(topics = "user.topic", myattr="user.topic.deadletter")
public void consume(ConsumerRecord<?, User> consumerRecord) {
    LOG.info("consumer topic-> " + consumerRecord.topic());
    LOG.info("consumer value-> " + consumerRecord.value());
}

I have meta listener

@KafkaListener(
        containerFactory = "listenerContainerFactory",
        autoStartup = "false",
        properties = {
                }
)
public @interface myListener {
    @AliasFor(
            annotation = KafkaListener.class,
            attribute = "groupId")
    String groupId() default "";

    String myattr() default "";
}

Now I want to send message to new topic mentioned in myattr="user.topic.deadletter", incase of issues?

How to send message to the mentioned in myattr and how to get value from custom annotaion @myListener


Solution

  • You cannot access the annotation attributes at runtime; one solution would be to add a BeanPostProcessor to examine the attribute on the annotation and inject it into the bean as a property.

    Or, simply add code to the bean itself to examine the method annotation (e.g. in a constructor) and store the value in a field.

    A more advanced solution would be to create a proxy around the bean and add the attribute value as a header to the ConsumerRecord.