javaspringspring-bootapache-kafka

Kafka is not consuming/Receiving the messages from my KafkaProducer


I have a SpringBoot application that connects to a Kafka broker and produces messages to a configured Kafka topic. I have first of all a SenderComponent which schedules tasks based on a cron expression.

Businesswise this Component has scheduled jobs stored in a repository and it schedules tasks based on the stored cron expressions.

package com.sporting.scheduler.sender.application;

import com.sporting.scheduler.registry.domain.OutgoingRouting;
import com.sporting.scheduler.registry.domain.ScheduledJobRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;

import java.util.HashSet;
import java.util.Set;

@Component
@RequiredArgsConstructor
@EnableScheduling
@Slf4j
public class SenderComponent {

    static final String SENDER_PREFIX = "[SENDER] - ";
    private final Set<String> scheduledRoutings = new HashSet<>();

    @Value("${sporting.kafka.url}")
    private String url;

    @Value("${sporting.kafka.sender-topic}")
    private String topicName;

    private final ScheduledJobRepository repository;

    private final TaskScheduler scheduler;

    private final SenderKafkaProducer senderKafkaProducer;

    @Bean
    public CommandLineRunner start() {
        return args -> log.info("{}The scheduler sender started, listening for scheduled tasks to trigger.", SENDER_PREFIX);
    }

    @Scheduled(fixedRate = 1000 * 60, initialDelay = 1000) // Every minute, wait a second to boot
    public void senderTriggers() {
            Set<OutgoingRouting> routings = repository.fetchOutgoingRoutings();
            if (routings.isEmpty()) {
                log.info("{}No outgoing routings found in the job registry.", SENDER_PREFIX);
                return;
            }
            routings.forEach(this::scheduleTask);
    }

    private void scheduleTask(OutgoingRouting outgoingRouting) {
        if (!scheduledRoutings.contains(outgoingRouting.getCode())) {
            scheduler.schedule(() -> senderKafkaProducer.send(topicName, outgoingRouting.getCode()), new CronTrigger(outgoingRouting.getCronExpression()));
            scheduledRoutings.add(outgoingRouting.getCode());
        }
    }
}

This is the code that I've written to the SenderKafkaProducer

package com.sporting.scheduler.sender.application;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Component
@RequiredArgsConstructor
@Slf4j
public class SenderKafkaProducer {

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
        return props;
    }

    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    public void send(String topic, String message) {
        kafkaTemplate().send(topic, message);
        log.info("Sent message [{}] to topic [{}]", message, topic);

    }
}

What I see happening is that I receive following log lines

2025-08-07T16:52:00.015+02:00  INFO 32399 --- [Sender] [   scheduling-1] c.t.s.d.a.SenderKafkaProducer        : Sent message [BINGO_MAN] to topic [SCH-list]

So I'm assuming that the Kafka broker received my message well. In the meanwhile I'm running a kafka consumer on that broker, but it's not receiving any message at all.

I did a test by using kcat kcat -b localhost:9092 -t SCH-list -L in my terminal to verify if the broker is up and running and it works. It gives me the following output

Metadata for SCH-list (from broker 1: localhost:9092/1):
 1 brokers:
  broker 1 at localhost:9092 (controller)
 1 topics:
  topic "SCH-list" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

But my message BINGO_MAN isn't received. I have the idea that my SpringBoot application is connecting correctly to the Kafka broker, but something goes wrong with the producer. Can anyone point me out what I'm doing wrong and also give tips how I can verify in my code if the message got received well by my Kafka broker? Thanks in advance.


Solution

  • Like suggested on the comment, you are instantiating your KafkaTemplate within the same Component class for your producer. So your producer is not using Spring managed bean, rather, instantiating a new KafkaTemplate every time it runs which might not be properly using Spring proxies for Kafka.

    So try it with something like:

    @Configuration
    public class MyKafkaConfiguration {
        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
            return props;
        }
    
        @Bean
        public KafkaTemplate<Integer, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    
    }
    

    Then in your SenderComponent just autowire the KafkaTemplate bean

    @Component
    @RequiredArgsConstructor
    @EnableScheduling
    @Slf4j
    public class SenderComponent {
        private final KafkaTemplate<Integer, String> kafkaTemplate;
        ....
    }
    

    Also move the

    @Bean
    public CommandLineRunner start() {
        return args -> log.info("{}The scheduler sender started, listening for scheduled tasks to trigger.", SENDER_PREFIX);
    }
    

    To a configuration class as well.

    Side note: in your SenderComponent url is not being used