spring-bootapache-kafkaspring-kafka

Does spring always require KafkaTemplate?


Does Spring Boot always require creating a bean of type KafkaTemplate? Details/stacktrace/codebase below, please tell me if what I am doing is incorrect.

  1. I have been posting messages to a topic from a spring boot project
  2. In order to create callback mechanisms, I have used org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord<K, V>, Callback) in order to send the message and also create a callback
  3. The reason I did this way is because Listenablefuture when using KafkaTemplate only provides exception on failures( and i wanted to register callbacks as a separate reusable class across all my usecases)
  4. However, spring fails to start up when I don't define a bean of type KafkaTemplate with the following error:
Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaTemplate' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:541) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1380) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory$DependencyObjectProvider.getIfUnique(DefaultListableBeanFactory.java:2063) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration.<init>(KafkaAnnotationDrivenConfiguration.java:90) ~[spring-boot-autoconfigure-2.4.12.jar:2.4.12]
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:na]
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) ~[na:na]
    at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211) ~[spring-beans-5.3.12.jar:5.3.12]
    ... 22 common frames omitted
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1790) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1346) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1300) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:887) ~[spring-beans-5.3.12.jar:5.3.12]
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791) ~[spring-beans-5.3.12.jar:5.3.12]
    ... 40 common frames omitted

My Kafka config is below

@Configuration
public class KafkaEventConfig {

    private final KafkaProperties kafkaProperties;

    @Value("${client.id}")
    private String clientId;


    @Value("${topic.movie.name}")
    private String movieTopicName;
    
    @Value("${retry.backoff.ms}")
    private int retryBackoffMilliseconds;

    @Value("${request.timeout.ms}")
    private int requestTimeoutMilliseconds;

    public KafkaEventConfig(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public ProducerFactory<String, Movie> producerFactory() {
        Map<String, Object> props = kafkaProperties.buildProducerProperties();
        populateCommonProperties(props);
        return new DefaultKafkaProducerFactory<>(props);
    }

    private void populateCommonProperties(Map<String, Object> props) {
        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMilliseconds);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMilliseconds);
    }
    
    @Bean
    public KafkaProducer<String, Movie> movieKafkaProducer() {
        return new KafkaProducer<String, Movie>(producerFactory().getConfigurationProperties());
    }

    @Bean
    public KafkaProducerMonitor kafkaProducerMonitor(KafkaProducer<String, Movie> kafkaProducer,
            MeterRegistry registry) {
        return new KafkaProducerMonitor(kafkaProducer, registry, Tags.of("topic", movieTopicName));
    }

My Kafka Callback is below

@Slf4j 
public class KafkaProducerCallBack<K, V> implements Callback {

    private ProducerRecord<K, V> producerRecord;

    public KafkaProducerCallBack(ProducerRecord<K, V> producerRecord) {

        this.producerRecord = producerRecord;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        String topicName= metadata.topic();
        long offset= metadata.offset();
        
        if (exception != null) {

            log.error("Failed to produce message [{}] to topic {} with exception {}", producerRecord, topicName, exception);
        }

        else {

            log.info("Sucessfully published message [{}] to topic {} to offset {}", producerRecord, topicName , offset);
            
        }

    }

}

I publish messages like so

movieKafkaProducer.send(message, new KafkaProducerCallBack<String, Movie>(message));

Please note the moment i add the below lines in KafkaEventConfig everything works fine

@Bean
public KafkaTemplate<String, Movie> movieKafkaTemplate() {
    return new KafkaTemplate<String, Movie>(producerFactory());
}

Solution

  • Additional to the latter that @M.Deinum mentioned:

    Take a look at the KafkaAutoConfiguration class:

        @Bean
        @ConditionalOnMissingBean(KafkaTemplate.class)
        public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
                ProducerListener<Object, Object> kafkaProducerListener,
                ObjectProvider<RecordMessageConverter> messageConverter) {
            KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
            messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
            kafkaTemplate.setProducerListener(kafkaProducerListener);
            kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
            return kafkaTemplate;
        }
    
    

    Springboot will create a KafkaTemplate bean for you if you don't create your own. This auto-configured bean depends on ProducerFactory<Object, Object> bean, and because you declared a ProducerFactory<String, Movie>. As you could see the type wasn't fit, that's why you got an error.


    the reason i did this way is because listenablefuture when using KafkaTemplate only provides exception on failures( and i wanted to register callbacks as a separate reusable class across all my usecases)

    Your case, you can still get the advantages of using KafkaTemplate. Instead of implementing a Callback, you can implement your own ProducerListener<K, V> and bind it into your KafkaTemple. E.g:

    FullLoggingProducerListener.class

    public class FullLoggingProducerListener<K, V> implements ProducerListener<K, V> {
        @Override
        public void onSuccess(ProducerRecord<K, V> record, RecordMetadata recordMetadata) {
            log.info("Successful!");
        }
    
        @Override
        public void onError(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception) {
            log.error("Error!");
        }
    }
    

    YourConfigration.class

        @Bean
        public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, Movie> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
            KafkaTemplate<String, Movie> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
            kafkaTemplate.setProducerListener(kafkaProducerListener);
            return kafkaTemplate;
        }
    

    Now, everytime you use KafkaTemplate to send a record, you'll see the log.