javaspring-bootkotlinapache-kafkasasl

How to handle AWS Secret Manager rotation in a Spring boot project for Kafka consumers with SASL Authentication Strategy


I have a spring boot application with the following configuration for Kafka consumers:

@EnableKafka
@Configuration
class KafkaConsumerConfig(
    @Value("\${aws.secret-manager.sasl-auth.secret-name}") private val kafkaAuthSecretName: String,
    private val kafkaProperties: KafkaProperties,
    private val awsSecretManagerAdaptor: AwsSecretManagerAdaptor,
    private val applicationContext: ApplicationContext
) {

    private val logger = KotlinLogging.logger { }

    @Bean
    fun kafkaListenerContainerFactory():
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        factory.containerProperties.isMissingTopicsFatal = false
        factory.setRetryTemplate(retryTemplate())
        factory.setErrorHandler { exception, data ->
            logger.error("Error in process with Exception {} and the record is {}", exception, data)
            SpringApplication.exit(applicationContext)
        }
        return factory
    }

    fun consumerFactory(): ConsumerFactory<String, String> {
        return DefaultKafkaConsumerFactory(consumerConfig())
    }

    fun consumerConfig(): Map<String, Any> {
        val props = kafkaProperties.buildConsumerProperties()
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java

        val secretValue = awsSecretManagerAdaptor.getSecretValue(kafkaAuthSecretName)
        val username = getUsername()
        val saslJaasConfig =
            """org.apache.kafka.common.security.scram.ScramLoginModule required username="$username" password="$secretValue";"""
        props[SaslConfigs.SASL_JAAS_CONFIG] = saslJaasConfig

        return props
    }

    private fun getUsername(): String? {
        val secretTags = awsSecretManagerAdaptor.getSecretTags(kafkaAuthSecretName)
        return secretTags.firstOrNull { it.key().equals("username") }?.value()
    }

    private fun retryTemplate(): RetryTemplate {
        val retryTemplate = RetryTemplate()
        retryTemplate.setBackOffPolicy(getFixedBackOffPolicy())
        retryTemplate.setRetryPolicy(getSimpleRetryPolicy())

        return retryTemplate
    }

    private fun getFixedBackOffPolicy(): BackOffPolicy {
        val fixedBackOffPolicy = FixedBackOffPolicy()
        fixedBackOffPolicy.backOffPeriod = 3000
        return fixedBackOffPolicy
    }

    private fun getSimpleRetryPolicy(): SimpleRetryPolicy {
        val simpleRetryPolicy = SimpleRetryPolicy()
        simpleRetryPolicy.maxAttempts = 3
        return simpleRetryPolicy
    }
}

The Kafka server provides SASL Authentication with username and password. As You can see the username and password are fetched using a service called AwsSecretManagerAdaptor from AWS secret manager. The configuration works like charm, however, once the Secret is rotated, and the Kafka consumers are restarted the SASL authentication fails. To solve the issue I am now restarting the SpringBoot Application in order to read the rotated key correctly from AWS Secret Manager.

The solution works but as you can see restarting the application is very ugly and error-prone. I wonder do you have any better suggestions for the improvement?


Solution

  • Kafka provides support for custom SASL call back handler. You can override the default SASL call back handler for name and password call back and return the username and password values from AWS secret manager without reconstructing consumer or producer.

    Check this https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers

    You can extend the class ‘org.apache.kafka.common.security.authenticator.SaslClientCAllbackHandler’ and provide your own implementation for Namecallback or passwordcallback