I have problem with producer of Kafka with Spring cloud with Spring boot. When I try to create configuration in application.yml, It allways send messages to wrong topic. I use placeholder for my topic "kafka_demo_topic_out_0" and it sends message not in right destination kafka_demo_topic but in kafka_demo_topic_out_0. Here is my code for producer :
package org.heller.kafka.demo.producer;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.heller.kafka.demo.producer.pojo.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
@Component
@EnableScheduling
public class KafkaProducer {
private AtomicLong idGenerator = new AtomicLong();
@Autowired
StreamBridge streamBridge;
@Scheduled(fixedDelay = 1000)
public void scheduleFixedRateTask() throws Exception {
Message message = constructMessage();
System.out.println("sending message" + message);
streamBridge.send("kafka_demo_topic_out_0", new ObjectMapper().writeValueAsString(message));
}
private Message constructMessage() {
Message message = new Message();
message.setId(idGenerator.getAndIncrement());
message.setUuid( UUID.randomUUID().toString());
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
message.setDate(now.format(formatter));
return message;
}
}
Here is my application.yml :
spring:
cloud:
stream:
kafka:
binder:
autoAddPartitions: true
brokers: localhost:9092
auto-create-topics: false
bindings:
kafka_demo_topic_out_0:
producer:
headerMode: raw
destination: kafka_demo_topic
content-type: application/json
binder: kafka
I use Spring starter Kafka :
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>4.1.0</version>
</dependency>
Thank you for any help.
Update: Spring cloud stream bridge somehow ignore configuration in application.yml. I try this configuration, but It still create new topic named kafkaDemoTopic.
spring:
cloud:
stream:
function:
definition: kafkaDemoTopic
kafka:
binder:
autoAddPartitions: true
brokers: localhost:9092
auto-create-topics: false
bindings:
kafkaDemoTopic-out-0:
headerMode: raw
destination: kafka_demo_topic
content-type: application/json
binder: kafka
I find solution, my application.yml :
spring:
cloud:
stream:
bindings:
kafkaDemoTopic:
destination: kafka_demo_topic
kafka:
binder:
autoAddPartitions: true
brokers: localhost:9092
autoCreateTopics: false
My Kafka producer:
package org.heller.kafka.demo.producer;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.heller.kafka.demo.producer.pojo.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
@Component
@EnableScheduling
public class KafkaProducer {
private AtomicLong idGenerator = new AtomicLong();
@Autowired
StreamBridge streamBridge;
@Scheduled(fixedDelay = 1000)
public void scheduleFixedRateTask() throws Exception {
Message message = constructMessage();
System.out.println("sending message" + message);
streamBridge.send("kafkaDemoTopic", new ObjectMapper().writeValueAsString(message));
}
private Message constructMessage() {
Message message = new Message();
message.setId(idGenerator.getAndIncrement());
message.setUuid( UUID.randomUUID().toString());
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
message.setDate(now.format(formatter));
return message;
}
}
My pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>org.heller.kafka.demo</groupId>
<artifactId>kafka_demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka_demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-commons</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>