I am working on two spring boot microservices deployed on aws-eks and uses kafka where MS1 uses spring kafka and MS2 uses cloud kafka. In both case, if I update the topic authorization policy(IAM-MSK policy), both will throw
org.springframework.kafka.listener.KafkaMessageListenerContainer : Authentication/Authorization Exception and no authExceptionRetryInterval set
org.springframework.kafka.listener.KafkaMessageListenerContainer : Fatal consumer exception; stopping container
errors. I wanted my microservices to be continue to run by retrying the kafka connection in certain interval rather than stopping the container. For MS1 when I added fix by adding custom bean for ConcurrentKafkaListenerContainerFactory
and using ConcurrentKafkaListenerContainerFactory.getContainerProperties().setAuthExceptionRetryInterval()
method in it, started working as expected. But for cloud kafka, the issue persists with above fix. I tried to set spring.cloud.stream.bindingRetryInterval
value with which it partially worked(If policy is wrong while pod start itself) but failed in other case(When pod is running and then policy is corrupted). What is the correct configuration I need to apply on my application yaml or what code I need to apply so that MS2 never stops the container but retry the the connection in certain interval like MS1 fix.
Add below bean and it will fix the above mentioned issue on cloud kafka.
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
return (container, dest, group) -> container.getContainerProperties()
.setAuthExceptionRetryInterval(Duration.ofSeconds(30));
}
Read here.