I have the following Kafka consumer interceptor class (ApiGatewayKafkaConsumerInterceptor):
package com.demo.api.gateway.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
public class ApiGatewayKafkaConsumerInterceptor implements ConsumerInterceptor<String, Object> {
public ApiGatewayKafkaConsumerInterceptor(){
System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$");
System.out.println("$$$ $$$");
System.out.println("$$$ GATEWAY INTERCEPTOR REGISTERED $$$");
System.out.println("$$$ $$$");
System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$");
}
@Override
public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> consumerRecords) {
return consumerRecords;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
and the following KafkaConsumerConfig class:
package com.demo.api.gateway.kafka.config;
import com.demo.api.gateway.controller.switcher.TransactionController;
import com.demo.api.gateway.kafka.interceptor.ApiGatewayKafkaConsumerInterceptor;
import com.demo.api.gateway.kafka.model.ConsumerConfigType;
import com.demo.api.gateway.kafka.model.KafkaConsumerProperties;
import com.demo.common.kafka.interceptor.KafkaConsumerInterceptor;
import com.demo.common.service.ServiceLogger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Scope;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private final ServiceLogger logger = new ServiceLogger(KafkaConsumerConfig.class);
@Value("${kafka.concurrency}")
int concurrency;
@Autowired
KafkaConsumerProperties kafkaProperties;
// @Value("${spring.kafka.consumer.session.timeout.ms}")
// private String sessionTimeout;
@Bean
public Map<String, Object> consumerConfigs() {
return getProperties(ConsumerConfigType.DEFAULT);
}
private Map<String, Object> getProperties(ConsumerConfigType type) {
System.out.print(kafkaProperties.getMaxPartitionFetchBytes());
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootStarpServers());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.isEnableAutoCommit());
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaProperties.getAutoCommitIntervalMs());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaProperties.getMaxPartitionFetchBytes());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId(type)); //how or which group Id??
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaProperties.getMaxPollRecords(type));
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProperties.getRequestTimeoutMS());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
System.out.println("Registering ApiGatewayKafkaConsumerInterceptor....");
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ApiGatewayKafkaConsumerInterceptor.class.getName());
return props;
}
@Bean(name = "consumerConfig")
@Scope(value = "prototype")
@Lazy(value = true)
public Map<String, Object> consumerConfigs(ConsumerConfigType configType) {
return getProperties(configType);
}
@ConditionalOnMissingBean(name = "requestKafkaListenerContainerFactory")
public ConsumerFactory<String, Object> requestConsumerFactory() {
Map<String, Object> props = consumerConfigs(ConsumerConfigType.REQUEST);
System.out.println("Creating a consumer factory with props = " + props);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> requestKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(requestConsumerFactory());
factory.setBatchListener(true);
factory.setConcurrency(concurrency);
return factory;
}
@Bean
public DefaultKafkaHeaderMapper headerMapper() {
return new DefaultKafkaHeaderMapper();
}
}
Although the interceptor.classes
property is successfully passed to the consumer factory, it still appears empty when Kafka displays the ConsumerConfig values, and the interceptor is not registered. Here is the output (relevant part) when I start the spring boot app:
Registering ApiGatewayKafkaConsumerInterceptor....
Creating a consumer factory with props = {key.deserializer=class org.apache.kafka.common.serialization.StringDeserializer, spring.json.trusted.packages=*, value.deserializer=class org.springframework.kafka.support.serializer.JsonDeserializer, enable.auto.commit=true, max.poll.records=10, request.timeout.ms=60000, group.id=gateway-group, max.partition.fetch.bytes=100000000, bootstrap.servers=[127.0.0.1:9092], interceptor.classes=com.demo.api.gateway.kafka.interceptor.ApiGatewayKafkaConsumerInterceptor, auto.commit.interval.ms=1000, session.timeout.ms=15000}
2024-11-10 09:57:55.359 INFO 16024 --- [ restartedMain] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 1000
auto.offset.reset = latest
bootstrap.servers = [127.0.0.1:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-gateway-group-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = gateway-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 10
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Update: I think Kafka is overriding/ignoring the properties that I define in getProperties
method. If you look at some properties such as session.timeout.ms=15000
, you will notice that the value I pass is 15000 whereas the value that Kafka displays in the ConsumerConfig values is 10000. In other words, compare this output line:
Creating a consumer factory with props = { ... session.timeout.ms=15000}
with this part:
2024-11-10 13:32:22.923 INFO 21072 --- [ restartedMain] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
session.timeout.ms = 10000
...
For this reason, it seems that the issue is not specific to the consumer interceptor but rather related to the consumer configs in general. How can I prevent Kafka from ignoring/overriding my consumer configs and make sure it uses the configs that I pass?
Update 2:
I don't plan to use the consumer interceptor for @KafkaListener methods (my microservice doesn't have any @KafkaListener methods). I plan to use it while consuming the response of kafkaTemplate.sendAndReceive() method (The receive part).
It seems that the producer config class is also using a consumer factory to handle the reply of kafkaTemplate.sendAndReceive
method. However, it's not using the same consumer configs that I define in the consumer config class.
// Listener Container to be set up in ReplyingKafkaTemplate
@Bean
public KafkaMessageListenerContainer<String, Object> replyContainer(ConsumerFactory<String, Object> cf) {
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
I added the following code to the producer config class to solve the issue:
@Autowired
private KafkaConsumerConfig consumerConfigs;
// Default Consumer Factory
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs.consumerConfigs());
}