javaspring-bootapache-kafkaspring-cloud-stream-binder-kafka

How to send message from Kafka to topic with Spring Cloud


I have problem with producer of Kafka with Spring cloud with Spring boot. When I try to create configuration in application.yml, It allways send messages to wrong topic. I use placeholder for my topic "kafka_demo_topic_out_0" and it sends message not in right destination kafka_demo_topic but in kafka_demo_topic_out_0. Here is my code for producer :

package org.heller.kafka.demo.producer;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import org.heller.kafka.demo.producer.pojo.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;

@Component
@EnableScheduling
public class KafkaProducer {

    private AtomicLong idGenerator = new AtomicLong();
    
    @Autowired
    StreamBridge streamBridge;
    
    @Scheduled(fixedDelay  = 1000)
    public void scheduleFixedRateTask() throws Exception {
        Message message = constructMessage();
        System.out.println("sending message" + message);
        
        streamBridge.send("kafka_demo_topic_out_0", new ObjectMapper().writeValueAsString(message));
    }
    
    private Message constructMessage() {
        Message message = new Message();
        message.setId(idGenerator.getAndIncrement());
        message.setUuid( UUID.randomUUID().toString());
        
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        message.setDate(now.format(formatter));
         
        return message;
    }
    
}

Here is my application.yml :

spring:
    cloud:
      stream:
        kafka:
          binder:
             autoAddPartitions: true
             brokers: localhost:9092
             auto-create-topics: false
      bindings:
        kafka_demo_topic_out_0:
          producer:
              headerMode: raw
        destination: kafka_demo_topic
        content-type: application/json
        binder: kafka

I use Spring starter Kafka :

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    <version>4.1.0</version>
</dependency>

Thank you for any help.

Update: Spring cloud stream bridge somehow ignore configuration in application.yml. I try this configuration, but It still create new topic named kafkaDemoTopic.

spring:
    cloud:
      stream:
       function:
          definition: kafkaDemoTopic
       kafka:
          binder:
             autoAddPartitions: true
             brokers: localhost:9092
             auto-create-topics: false
      bindings:
        kafkaDemoTopic-out-0:
              headerMode: raw
              destination: kafka_demo_topic
              content-type: application/json
      binder: kafka
  

Solution

  • I find solution, my application.yml :

    spring:
      cloud:
        stream:
          bindings:
            kafkaDemoTopic:
              destination: kafka_demo_topic
        kafka:
           binder:
               autoAddPartitions: true
               brokers: localhost:9092
               autoCreateTopics: false
    

    My Kafka producer:

    package org.heller.kafka.demo.producer;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.UUID;
    import java.util.concurrent.atomic.AtomicLong;
    
    import org.heller.kafka.demo.producer.pojo.Message;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.function.StreamBridge;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    @Component
    @EnableScheduling
    
    public class KafkaProducer {
    
        private AtomicLong idGenerator = new AtomicLong();
        
        @Autowired
        StreamBridge streamBridge;
        
        @Scheduled(fixedDelay  = 1000)
        public void scheduleFixedRateTask() throws Exception {
            Message message = constructMessage();
            System.out.println("sending message" + message);
            
            streamBridge.send("kafkaDemoTopic", new ObjectMapper().writeValueAsString(message));
        }
        
        private Message constructMessage() {
            Message message = new Message();
            message.setId(idGenerator.getAndIncrement());
            message.setUuid( UUID.randomUUID().toString());
            
            LocalDateTime now = LocalDateTime.now();
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            message.setDate(now.format(formatter));
             
            return message;
        }
        
    }
    

    My pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>3.2.0</version>
            <relativePath /> <!-- lookup parent from repository -->
        </parent>
        <groupId>org.heller.kafka.demo</groupId>
        <artifactId>kafka_demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>kafka_demo</name>
        <description>Demo project for Spring Boot</description>
        <properties>
            <java.version>17</java.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
                <version>4.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream</artifactId>
                <version>4.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
                <version>4.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-commons</artifactId>
                <version>4.1.0</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.16.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>