spring-bootapache-kafkaspring-kafkapcfdev

Kafka Reply Time out when applications are auto scaled in PCF


I am using ReplyingKafkaTemplate for Kafka Synchronous responses and I am able to get response when only once instance is running. But if the application scaled up to more than one instance I am getting Time out error.

From Documentation

When configuring with a single reply topic, each instance must use a different group.id. In this case, all instances receive each reply.

As per Documentation,if we need to use different consumer group, does this mean we need to manually run the instances with different consumer group? How can we handle auto scaling if we use tools like PCF. Below is my kafka configuraion.

@Configuration
@EnableKafka
public class KafkaConfig {

    //My Properties


    @Bean
    public Map < String, Object > producerConfig() {
        Map < String, Object > props = new HashMap < > ();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public Map < String, Object > consumerConfig() {
        Map < String, Object > props = new HashMap < > ();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, commitInterval);
        return props;
    }

    @Bean
    public ProducerFactory < String, String > producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfig());
    }

    @Bean
    public ConsumerFactory < String, String > consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfig());
    }

    @Bean
    public KafkaTemplate < String, String > kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }

    @Bean
    public ReplyingKafkaTemplate < String, String, String > replyingKafkaTemplate
                            (ProducerFactory < String, String > pf, KafkaMessageListenerContainer < String, String > container) {
        ReplyingKafkaTemplate < String, String, String > rkt = new ReplyingKafkaTemplate(pf, container);
        rkt.setDefaultReplyTimeout(Duration.ofMillis(slaTime));
        rkt.setSharedReplyTopic(true);
        return rkt;
    }

    @Bean
    public KafkaMessageListenerContainer < String, String > replyContainer(ConsumerFactory < String, String > cf) {
        ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
        containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);

        return new KafkaMessageListenerContainer < > (cf, containerProperties);
    }

    @Bean
    public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, String >> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory < String, String > factory = new ConcurrentKafkaListenerContainerFactory < > ();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(replyingKafkaTemplate(producerFactory(), replyContainer(consumerFactory())));
        return factory;
    }
}

Solution

  • In the replyContainer bean, add

    containerProperties.setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    containerProperties.setKafkaConsumerProperties(props);
    

    In the replyingKafkaTemplate, add

    rkt.setSharedReplyTopic(true);
    

    The request topic needs at least as many partitions as the maximum scale-out. The reply topic can have any number of partitions (including 1).

    With PCF, you can construct the groupId using the instanceIndex instead of making it random.

    You could also use the instanceIndex as the REPLY_PARTITION header and use fixed reply partitions; in which case you would need at least as many partitions as the maximum instanceIndex you expect to use.