javaspring-bootapache-kafkaspring-kafka

Kafka topic creation uses wrong configuration


I am using the latest Spring Boot 3.4.1 and Spring Kafka 3.3.1.

I would like to create a new Kafka topic during the startup as it is mentioned in the official doc here, but the TopicBuilder is trying to use a wrong configuration with default values like bootstrap.servers = [localhost:9092] and it fails to create the topic.

After the startup when I send a message to the topic everything works great and the KafkaTemplate uses my configuration.

Why TopicBuilder uses different Kafka configuration then the KafkaTemplate? What is wrong with my code?

I see there is a configs method on the TopicBuilder here, but it expects Map<String,String> and I am not able to reuse the config map that I use for DefaultKafkaProducerFactory which is Map<String, Object>.

configuration:

@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaProducerConfiguration {

    public static final String KAFKA_TOPIC = "topic1";

    @Value("${KAFKA_SERVERS}")
    private String bootstrapAddresses;

    @Bean
    public ProducerFactory<String, Event> producerFactory() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configs);
    }

    @Bean
    public KafkaTemplate<String, Event> kafkaTemplate() {
        var factory = producerFactory();
        log.debug("initializing a KafkaTemplate using the following setting: {{}}", configurationToString(factory));
        return new KafkaTemplate<>(factory);
    }

    @Bean
    public NewTopic topic() {
        log.debug("creating a new topic kafka topic: {}", KAFKA_TOPIC);
        return TopicBuilder.name(KAFKA_TOPIC)
                .partitions(1)
                .replicas(1)
                .build();
    }

    private String configurationToString(ProducerFactory<String, Event> producerFactory) {
        var sb = new StringBuilder();
        producerFactory.
                getConfigurationProperties().
                forEach((key, value) -> sb.append(String.format("\"%s\": \"%s\"", key, value)));
        return sb.toString();
    }
}

sender:

@Slf4j @RestController @RequestMapping("/api/kafka") public class KafkaProducerController {

private final KafkaTemplate<String, Event> kafkaTemplate;

public KafkaProducerController(KafkaTemplate<String, Event> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

@GetMapping("/send")
@MethodStatistics
public String sendMessageToKafka() {
    var event = Event.builder().....build();
    kafkaTemplate.send(KafkaProducerConfiguration.KAFKA_TOPIC, event);
    return "Message has been sent to Kafka topic.";
}

log:

2025-01-16T15:29:04.448Z  INFO 206 --- [my-kafka-producer] [           main] c.r.g.s.message.producer.Application     : Starting Application v0.1.0 using Java 21.0.5 with PID 206 (/jar-to-run/remal-my-kafka-producer-0.1.0.jar started by root in /jar-to-run)
2025-01-16T15:29:04.450Z DEBUG 206 --- [my-kafka-producer] [           main] c.r.g.s.message.producer.Application     : Running with Spring Boot v3.4.1, Spring v6.2.1
2025-01-16T15:29:04.450Z  INFO 206 --- [my-kafka-producer] [           main] c.r.g.s.message.producer.Application     : No active profile set, falling back to 1 default profile: "default"
2025-01-16T15:29:06.204Z  INFO 206 --- [my-kafka-producer] [           main] o.s.cloud.context.scope.GenericScope     : BeanFactory id=bc87c8e9-04ef-3afc-981b-5d032d24e596
2025-01-16T15:29:07.358Z  INFO 206 --- [my-kafka-producer] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port 8443 (https)
2025-01-16T15:29:07.378Z  INFO 206 --- [my-kafka-producer] [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2025-01-16T15:29:07.378Z  INFO 206 --- [my-kafka-producer] [           main] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.34]
2025-01-16T15:29:07.416Z  INFO 206 --- [my-kafka-producer] [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2025-01-16T15:29:07.416Z  INFO 206 --- [my-kafka-producer] [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2918 ms

>>>>> 2025-01-16T15:29:08.062Z DEBUG 206 --- [my-kafka-producer] [           main] c.r.g.s.m.p.c.KafkaProducerConfiguration : initializing a KafkaTemplate using the following setting: {"bootstrap.servers": "kafka-1.hello.com:9092, kafka-2.hello.com:9092""key.serializer": "class org.apache.kafka.common.serialization.StringSerializer""value.serializer": "class org.springframework.kafka.support.serializer.JsonSerializer"}

>>>>> 2025-01-16T15:29:08.252Z DEBUG 206 --- [my-kafka-producer] [           main] c.r.g.s.m.p.c.KafkaProducerConfiguration : creating a new topic kafka topic: topic1

2025-01-16T15:29:10.119Z  WARN 206 --- [my-kafka-producer] [           main] iguration$LoadBalancerCaffeineWarnLogger : Spring Cloud LoadBalancer is currently working with the default cache. While this cache implementation is useful for development and tests, it's recommended to use Caffeine cache in production.You can switch to using Caffeine cache, by adding it and org.springframework.cache.caffeine.CaffeineCacheManager to the classpath.
2025-01-16T15:29:10.128Z  INFO 206 --- [my-kafka-producer] [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 3 endpoints beneath base path '/actuator'
2025-01-16T15:29:10.225Z  INFO 206 --- [my-kafka-producer] [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    auto.include.jmx.reporter = true
    bootstrap.controllers = []

>>>>> bootstrap.servers = [localhost:9092]
    
    client.dns.lookup = use_all_dns_ips
    client.id = my-kafka-producer-admin-0
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    enable.metrics.push = true
    metadata.max.age.ms = 300000
    metadata.recovery.strategy = none
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.max.ms = 1000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

2025-01-16T15:29:10.426Z  INFO 206 --- [my-kafka-producer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.8.1
2025-01-16T15:29:10.427Z  INFO 206 --- [my-kafka-producer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 70d6ff42debf7e17
2025-01-16T15:29:10.427Z  INFO 206 --- [my-kafka-producer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1737041350424
2025-01-16T15:29:10.460Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Node -1 disconnected.
2025-01-16T15:29:10.463Z  WARN 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2025-01-16T15:29:10.566Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Node -1 disconnected.
2025-01-16T15:29:10.566Z  WARN 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
...
2025-01-16T15:29:40.314Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Node -1 disconnected.
2025-01-16T15:29:40.315Z  WARN 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2025-01-16T15:29:40.432Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=my-kafka-producer-admin-0] Metadata update failed

>>>>> 2025-01-16T15:29:40.448Z ERROR 206 --- [my-kafka-producer] [           main] o.springframework.kafka.core.KafkaAdmin  : Could not configure topics

org.springframework.kafka.KafkaException: Timed out waiting to get existing topics
    at org.springframework.kafka.core.KafkaAdmin.lambda$checkPartitions$13(KafkaAdmin.java:571) ~[spring-kafka-3.3.1.jar!/:3.3.1]
    at java.base/java.util.HashMap.forEach(HashMap.java:1429) ~[na:na]
    at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1707) ~[na:na]
    at org.springframework.kafka.core.KafkaAdmin.checkPartitions(KafkaAdmin.java:550) ~[spring-kafka-3.3.1.jar!/:3.3.1]
    at org.springframework.kafka.core.KafkaAdmin.addOrModifyTopicsIfNeeded(KafkaAdmin.java:443) ~[spring-kafka-3.3.1.jar!/:3.3.1]
    at org.springframework.kafka.core.KafkaAdmin.initialize(KafkaAdmin.java:276) ~[spring-kafka-3.3.1.jar!/:3.3.1]
    at org.springframework.kafka.core.KafkaAdmin.afterSingletonsInstantiated(KafkaAdmin.java:245) ~[spring-kafka-3.3.1.jar!/:3.3.1]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:1057) ~[spring-beans-6.2.1.jar!/:6.2.1]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:987) ~[spring-context-6.2.1.jar!/:6.2.1]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:627) ~[spring-context-6.2.1.jar!/:6.2.1]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.4.1.jar!/:3.4.1]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752) ~[spring-boot-3.4.1.jar!/:3.4.1]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439) ~[spring-boot-3.4.1.jar!/:3.4.1]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:318) ~[spring-boot-3.4.1.jar!/:3.4.1]
    at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:149) ~[spring-boot-3.4.1.jar!/:3.4.1]
    at com.remal.my.service.message.producer.Application.main(Application.java:20) ~[!/:0.1.0]
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
    at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:102) ~[remal-my-kafka-producer-0.1.0.jar:0.1.0]
    at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:64) ~[remal-my-kafka-producer-0.1.0.jar:0.1.0]
    at org.springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:40) ~[remal-my-kafka-producer-0.1.0.jar:0.1.0]
Caused by: java.util.concurrent.TimeoutException: null
    at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) ~[kafka-clients-3.8.1.jar!/:na]
    at org.springframework.kafka.core.KafkaAdmin.lambda$checkPartitions$13(KafkaAdmin.java:553) ~[spring-kafka-3.3.1.jar!/:3.3.1]
    ... 20 common frames omitted

2025-01-16T15:29:41.253Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Node -1 disconnected.
2025-01-16T15:29:41.254Z  WARN 206 --- [my-kafka-producer] [roducer-admin-0] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=my-kafka-producer-admin-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
...
2025-01-16T15:29:50.450Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.k.clients.admin.KafkaAdminClient     : [AdminClient clientId=my-kafka-producer-admin-0] Forcing a hard I/O thread shutdown. Requests in progress will be aborted.
2025-01-16T15:29:50.451Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for my-kafka-producer-admin-0 unregistered
2025-01-16T15:29:50.452Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=my-kafka-producer-admin-0] Metadata update failed

org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: fetchMetadata

2025-01-16T15:29:50.452Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.a.k.clients.admin.KafkaAdminClient     : [AdminClient clientId=my-kafka-producer-admin-0] Timed out 2 remaining operation(s) during close.
2025-01-16T15:29:50.456Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics    : Metrics scheduler closed
2025-01-16T15:29:50.456Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics    : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2025-01-16T15:29:50.456Z  INFO 206 --- [my-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed
2025-01-16T15:29:50.523Z  INFO 206 --- [my-kafka-producer] [           main] o.a.t.util.net.NioEndpoint.certificate   : Connector [https-jsse-nio-8443], TLS virtual host [_default_], certificate type [UNDEFINED] configured from keystore [/root/.keystore] using alias [kafka-producer-service-1.hello.com] with trust store [null]
2025-01-16T15:29:50.532Z  INFO 206 --- [my-kafka-producer] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port 8443 (https) with context path '/'
2025-01-16T15:29:50.539Z  INFO 206 --- [my-kafka-producer] [           main] o.s.c.c.s.ConsulServiceRegistry          : Registering service with consul: NewService{id='my-kafka-producer', name='my-kafka-producer', tags=[service, 0.1.0], address='kafka-producer-service-1.hello.com', meta={secure=true}, port=8443, enableTagOverride=null, check=Check{script='null', dockerContainerID='null', shell='null', interval='2s', ttl='null', http='https://kafka-producer-service-1.hello.com:8443/actuator/health', method='null', header={}, tcp='null', timeout='2s', deregisterCriticalServiceAfter='10s', tlsSkipVerify=null, status='null', grpc='null', grpcUseTLS=null}, checks=null}
2025-01-16T15:29:50.580Z  INFO 206 --- [my-kafka-producer] [           main] c.r.g.s.message.producer.Application     : Started Application in 51.927 seconds (process running for 52.723)
2025-01-16T15:29:50.636Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-6] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
{"status":"UP"}

>>>>>  2025-01-16T15:32:14.748Z DEBUG 206 --- [my-kafka-producer] [nio-8443-exec-7] c.r.g.c.m.MethodStatisticsAspect         : > calling the KafkaProducerController.sendMessageToKafka()...
2025-01-16T15:32:14.769Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = -1
    auto.include.jmx.reporter = true
    batch.size = 16384

>>>>> bootstrap.servers = [kafka-1.hello.com:9092, kafka-2.hello.com:9092]
    
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = my-kafka-producer-producer-1
    compression.gzip.level = -1
    compression.lz4.level = 9
    compression.type = none
    compression.zstd.level = 3
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = true
    enable.metrics.push = true
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metadata.recovery.strategy = none
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.adaptive.partitioning.enable = true
    partitioner.availability.timeout.ms = 0
    partitioner.class = null
    partitioner.ignore.keys = false
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.max.ms = 1000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

2025-01-16T15:32:14.826Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
2025-01-16T15:32:14.852Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=my-kafka-producer-producer-1] Instantiated an idempotent producer.
2025-01-16T15:32:14.884Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.8.1
2025-01-16T15:32:14.884Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 70d6ff42debf7e17
2025-01-16T15:32:14.885Z  INFO 206 --- [my-kafka-producer] [nio-8443-exec-7] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1737041534884
2025-01-16T15:32:15.448Z  WARN 206 --- [my-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=my-kafka-producer-producer-1] Error while fetching metadata with correlation id 2 : {topic1=LEADER_NOT_AVAILABLE}
2025-01-16T15:32:15.450Z  INFO 206 --- [my-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=my-kafka-producer-producer-1] Cluster ID: M_1qzbN3Q4e2C_SPkTcrhQ
2025-01-16T15:32:15.450Z  INFO 206 --- [my-kafka-producer] [ucer-producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=my-kafka-producer-producer-1] ProducerId set to 0 with epoch 0
>>>>> 2025-01-16T15:32:15.598Z DEBUG 206 --- [my-kafka-producer] [nio-8443-exec-7] c.r.g.c.m.MethodStatisticsAspect         : < call ended: {name: KafkaProducerController.sendMessageToKafka, arguments: , return: "Message has been sent to Kafka topic.", execution-in-ms: 851}

Solution

  • If you define a KafkaAdmin bean in your application context, it can automatically add topics to the broker. To do so, you can add a NewTopic @Bean for each topic to the application context. Add the following code snippet that can use the same configuration to create the topic and kafka template also use the same

    @Bean public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
    

    }