I am using ReplyingKafkaTemplate for Kafka Synchronous responses and I am able to get response when only once instance is running. But if the application scaled up to more than one instance I am getting Time out error.
From Documentation
When configuring with a single reply topic, each instance must use a different group.id. In this case, all instances receive each reply.
As per Documentation,if we need to use different consumer group, does this mean we need to manually run the instances with different consumer group? How can we handle auto scaling if we use tools like PCF. Below is my kafka configuraion.
@Configuration
@EnableKafka
public class KafkaConfig {
//My Properties
@Bean
public Map < String, Object > producerConfig() {
Map < String, Object > props = new HashMap < > ();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public Map < String, Object > consumerConfig() {
Map < String, Object > props = new HashMap < > ();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, commitInterval);
return props;
}
@Bean
public ProducerFactory < String, String > producerFactory() {
return new DefaultKafkaProducerFactory(producerConfig());
}
@Bean
public ConsumerFactory < String, String > consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfig());
}
@Bean
public KafkaTemplate < String, String > kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
@Bean
public ReplyingKafkaTemplate < String, String, String > replyingKafkaTemplate
(ProducerFactory < String, String > pf, KafkaMessageListenerContainer < String, String > container) {
ReplyingKafkaTemplate < String, String, String > rkt = new ReplyingKafkaTemplate(pf, container);
rkt.setDefaultReplyTimeout(Duration.ofMillis(slaTime));
rkt.setSharedReplyTopic(true);
return rkt;
}
@Bean
public KafkaMessageListenerContainer < String, String > replyContainer(ConsumerFactory < String, String > cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
return new KafkaMessageListenerContainer < > (cf, containerProperties);
}
@Bean
public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, String >> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory < String, String > factory = new ConcurrentKafkaListenerContainerFactory < > ();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(replyingKafkaTemplate(producerFactory(), replyContainer(consumerFactory())));
return factory;
}
}
In the replyContainer
bean, add
containerProperties.setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
containerProperties.setKafkaConsumerProperties(props);
In the replyingKafkaTemplate
, add
rkt.setSharedReplyTopic(true);
The request topic needs at least as many partitions as the maximum scale-out. The reply topic can have any number of partitions (including 1).
With PCF, you can construct the groupId using the instanceIndex
instead of making it random.
You could also use the instanceIndex
as the REPLY_PARTITION
header and use fixed reply partitions; in which case you would need at least as many partitions as the maximum instanceIndex
you expect to use.