springspring-kafkaaop

Spring aspect on kafka listener interface


I'm trying to create aspect and process every record, that my app consumed through Kafka.

This method do not work

import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;

@Slf4j
@Aspect
@Component
public class Test{

    @Around("execution(* org.springframework.kafka.listener.MessageListener.onMessage(..))")
    public Object onInvoke(ProceedingJoinPoint  joinPoint) throws Throwable {
        log.debug("In aspect");
        Object[] args = joinPoint.getArgs();
       return joinPoint.proceed();
    }
}

I've also tried get records by aspect with annotation @KafkaListener. It works as expected, but I also want to detect topic names for records (I use RetryTopicConfiguration, and @KafkaListener(topics = "mainTopic") ).


Solution

  • The @KafkaListener does not use a MessageListener, and even if it would, that one still i not managed as a bean in the application context, so making an aspect for that type is not the way to go.

    You another solution for the @KafkaListener method is correct. You can have a topic the record is consumed from via extra method argument against @Header(KafkaHeaders.RECEIVED_TOPIC).

    There is also a RecordInterceptor abstraction to be injected into the AbstractKafkaListenerContainerFactory used for the mentioned @KafkaListener infrastructure.