I'm trying to create aspect and process every record, that my app consumed through Kafka.
This method do not work
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
@Slf4j
@Aspect
@Component
public class Test{
@Around("execution(* org.springframework.kafka.listener.MessageListener.onMessage(..))")
public Object onInvoke(ProceedingJoinPoint joinPoint) throws Throwable {
log.debug("In aspect");
Object[] args = joinPoint.getArgs();
return joinPoint.proceed();
}
}
I've also tried get records by aspect with annotation @KafkaListener. It works as expected, but I also want to detect topic names for records (I use RetryTopicConfiguration, and @KafkaListener(topics = "mainTopic") ).
The @KafkaListener
does not use a MessageListener
, and even if it would, that one still i not managed as a bean in the application context, so making an aspect for that type is not the way to go.
You another solution for the @KafkaListener
method is correct. You can have a topic the record is consumed from via extra method argument against @Header(KafkaHeaders.RECEIVED_TOPIC)
.
There is also a RecordInterceptor
abstraction to be injected into the AbstractKafkaListenerContainerFactory
used for the mentioned @KafkaListener
infrastructure.