I have a SpringBoot application that connects to a Kafka broker and produces messages to a configured Kafka topic. I have first of all a SenderComponent which schedules tasks based on a cron expression.
Businesswise this Component has scheduled jobs stored in a repository and it schedules tasks based on the stored cron expressions.
package com.sporting.scheduler.sender.application;
import com.sporting.scheduler.registry.domain.OutgoingRouting;
import com.sporting.scheduler.registry.domain.ScheduledJobRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
@Component
@RequiredArgsConstructor
@EnableScheduling
@Slf4j
public class SenderComponent {
static final String SENDER_PREFIX = "[SENDER] - ";
private final Set<String> scheduledRoutings = new HashSet<>();
@Value("${sporting.kafka.url}")
private String url;
@Value("${sporting.kafka.sender-topic}")
private String topicName;
private final ScheduledJobRepository repository;
private final TaskScheduler scheduler;
private final SenderKafkaProducer senderKafkaProducer;
@Bean
public CommandLineRunner start() {
return args -> log.info("{}The scheduler sender started, listening for scheduled tasks to trigger.", SENDER_PREFIX);
}
@Scheduled(fixedRate = 1000 * 60, initialDelay = 1000) // Every minute, wait a second to boot
public void senderTriggers() {
Set<OutgoingRouting> routings = repository.fetchOutgoingRoutings();
if (routings.isEmpty()) {
log.info("{}No outgoing routings found in the job registry.", SENDER_PREFIX);
return;
}
routings.forEach(this::scheduleTask);
}
private void scheduleTask(OutgoingRouting outgoingRouting) {
if (!scheduledRoutings.contains(outgoingRouting.getCode())) {
scheduler.schedule(() -> senderKafkaProducer.send(topicName, outgoingRouting.getCode()), new CronTrigger(outgoingRouting.getCronExpression()));
scheduledRoutings.add(outgoingRouting.getCode());
}
}
}
This is the code that I've written to the SenderKafkaProducer
package com.sporting.scheduler.sender.application;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
@RequiredArgsConstructor
@Slf4j
public class SenderKafkaProducer {
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
public void send(String topic, String message) {
kafkaTemplate().send(topic, message);
log.info("Sent message [{}] to topic [{}]", message, topic);
}
}
What I see happening is that I receive following log lines
2025-08-07T16:52:00.015+02:00 INFO 32399 --- [Sender] [ scheduling-1] c.t.s.d.a.SenderKafkaProducer : Sent message [BINGO_MAN] to topic [SCH-list]
So I'm assuming that the Kafka broker received my message well. In the meanwhile I'm running a kafka consumer on that broker, but it's not receiving any message at all.
I did a test by using kcat kcat -b localhost:9092 -t SCH-list -L
in my terminal to verify if the broker is up and running and it works. It gives me the following output
Metadata for SCH-list (from broker 1: localhost:9092/1):
1 brokers:
broker 1 at localhost:9092 (controller)
1 topics:
topic "SCH-list" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
But my message BINGO_MAN isn't received. I have the idea that my SpringBoot application is connecting correctly to the Kafka broker, but something goes wrong with the producer. Can anyone point me out what I'm doing wrong and also give tips how I can verify in my code if the message got received well by my Kafka broker? Thanks in advance.
Like suggested on the comment, you are instantiating your KafkaTemplate
within the same Component
class for your producer. So your producer is not using Spring managed bean, rather, instantiating a new KafkaTemplate every time it runs which might not be properly using Spring proxies for Kafka.
So try it with something like:
@Configuration
public class MyKafkaConfiguration {
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Then in your SenderComponent
just autowire the KafkaTemplate
bean
@Component
@RequiredArgsConstructor
@EnableScheduling
@Slf4j
public class SenderComponent {
private final KafkaTemplate<Integer, String> kafkaTemplate;
....
}
Also move the
@Bean
public CommandLineRunner start() {
return args -> log.info("{}The scheduler sender started, listening for scheduled tasks to trigger.", SENDER_PREFIX);
}
To a configuration class as well.
Side note: in your SenderComponent
url
is not being used