javaapache-kafkaspring-kafka

Kafka Consumer Interceptor not being registered in a spring boot app


I have the following Kafka consumer interceptor class (ApiGatewayKafkaConsumerInterceptor):

package com.demo.api.gateway.kafka.interceptor;

import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class ApiGatewayKafkaConsumerInterceptor implements ConsumerInterceptor<String, Object> {

  public ApiGatewayKafkaConsumerInterceptor(){
    System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$");
    System.out.println("$$$                                                   $$$");
    System.out.println("$$$         GATEWAY INTERCEPTOR REGISTERED            $$$");
    System.out.println("$$$                                                   $$$");
    System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$");
  }

  @Override
  public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> consumerRecords) {
    return consumerRecords;
  }

  @Override
  public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
  }

  @Override
  public void close() {
  }

  @Override
  public void configure(Map<String, ?> configs) {
  }

}

and the following KafkaConsumerConfig class:

package com.demo.api.gateway.kafka.config;

import com.demo.api.gateway.controller.switcher.TransactionController;
import com.demo.api.gateway.kafka.interceptor.ApiGatewayKafkaConsumerInterceptor;
import com.demo.api.gateway.kafka.model.ConsumerConfigType;
import com.demo.api.gateway.kafka.model.KafkaConsumerProperties;
import com.demo.common.kafka.interceptor.KafkaConsumerInterceptor;
import com.demo.common.service.ServiceLogger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Scope;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

  private final ServiceLogger logger = new ServiceLogger(KafkaConsumerConfig.class);
  @Value("${kafka.concurrency}")
  int concurrency;
  @Autowired
  KafkaConsumerProperties kafkaProperties;

  //    @Value("${spring.kafka.consumer.session.timeout.ms}")
  //    private String sessionTimeout;
  @Bean
  public Map<String, Object> consumerConfigs() {
    return getProperties(ConsumerConfigType.DEFAULT);
  }

  private Map<String, Object> getProperties(ConsumerConfigType type) {
    System.out.print(kafkaProperties.getMaxPartitionFetchBytes());
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStarpServers());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.isEnableAutoCommit());
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaProperties.getAutoCommitIntervalMs());
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaProperties.getMaxPartitionFetchBytes());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId(type)); //how or which group Id??
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaProperties.getMaxPollRecords(type));
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProperties.getRequestTimeoutMS());
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    System.out.println("Registering ApiGatewayKafkaConsumerInterceptor....");
    props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ApiGatewayKafkaConsumerInterceptor.class.getName());
    return props;
  }

  @Bean(name = "consumerConfig")
  @Scope(value = "prototype")
  @Lazy(value = true)
  public Map<String, Object> consumerConfigs(ConsumerConfigType configType) {
    return getProperties(configType);
  }

  @ConditionalOnMissingBean(name = "requestKafkaListenerContainerFactory")
  public ConsumerFactory<String, Object> requestConsumerFactory() {
    Map<String, Object> props = consumerConfigs(ConsumerConfigType.REQUEST);
    System.out.println("Creating a consumer factory with props = " + props);
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
        new JsonDeserializer<>());
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object> requestKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(requestConsumerFactory());
    factory.setBatchListener(true);
    factory.setConcurrency(concurrency);
    return factory;
  }

  @Bean
  public DefaultKafkaHeaderMapper headerMapper() {
    return new DefaultKafkaHeaderMapper();
  }
}

Although the interceptor.classes property is successfully passed to the consumer factory, it still appears empty when Kafka displays the ConsumerConfig values, and the interceptor is not registered. Here is the output (relevant part) when I start the spring boot app:

Registering ApiGatewayKafkaConsumerInterceptor....
Creating a consumer factory with props = {key.deserializer=class org.apache.kafka.common.serialization.StringDeserializer, spring.json.trusted.packages=*, value.deserializer=class org.springframework.kafka.support.serializer.JsonDeserializer, enable.auto.commit=true, max.poll.records=10, request.timeout.ms=60000, group.id=gateway-group, max.partition.fetch.bytes=100000000, bootstrap.servers=[127.0.0.1:9092], interceptor.classes=com.demo.api.gateway.kafka.interceptor.ApiGatewayKafkaConsumerInterceptor, auto.commit.interval.ms=1000, session.timeout.ms=15000}
2024-11-10 09:57:55.359  INFO 16024 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 1000
    auto.offset.reset = latest
    bootstrap.servers = [127.0.0.1:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-gateway-group-1
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = gateway-group
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 10
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

Update: I think Kafka is overriding/ignoring the properties that I define in getProperties method. If you look at some properties such as session.timeout.ms=15000, you will notice that the value I pass is 15000 whereas the value that Kafka displays in the ConsumerConfig values is 10000. In other words, compare this output line:

Creating a consumer factory with props = { ... session.timeout.ms=15000}

with this part:

2024-11-10 13:32:22.923  INFO 21072 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    session.timeout.ms = 10000
    ...

For this reason, it seems that the issue is not specific to the consumer interceptor but rather related to the consumer configs in general. How can I prevent Kafka from ignoring/overriding my consumer configs and make sure it uses the configs that I pass?


Update 2:

I don't plan to use the consumer interceptor for @KafkaListener methods (my microservice doesn't have any @KafkaListener methods). I plan to use it while consuming the response of kafkaTemplate.sendAndReceive() method (The receive part).


Solution

  • It seems that the producer config class is also using a consumer factory to handle the reply of kafkaTemplate.sendAndReceive method. However, it's not using the same consumer configs that I define in the consumer config class.

    // Listener Container to be set up in ReplyingKafkaTemplate
      @Bean
      public KafkaMessageListenerContainer<String, Object> replyContainer(ConsumerFactory<String, Object> cf) {
        ContainerProperties containerProperties = new ContainerProperties(replyTopic);
        return new KafkaMessageListenerContainer<>(cf, containerProperties);
      }
    

    I added the following code to the producer config class to solve the issue:

    @Autowired
    private KafkaConsumerConfig consumerConfigs;
    
    // Default Consumer Factory
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
      return new DefaultKafkaConsumerFactory<>(consumerConfigs.consumerConfigs());
    }