javaspringapache-kafkaspring-cloud-stream

Unable to processing messages with spring cloud kafka binder


In the following code, I am trying to create a message by calling processOrder() via REST endpoint. Then, I want to pass the result of processOrder() to processShipping() and processPayment.

However, Whenever I call the rest endpoint http://localhost:8080/processOrder, just the processOrder() is called. What's wrong here?

package com.example.kafkademo.functions;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Consumer;
import java.util.function.Function;

@Configuration
public class MessageFunctions {

    @Bean
    public Function<String, String> processOrder(){
        return orderId -> {
            System.out.println("processOrder: " + orderId);
            System.out.println(orderId);
            return orderId + " : " + System.currentTimeMillis();
        };
    }

    @Bean
    public Consumer<String> processShipping(){
        return orderId -> {
            System.out.println("processShipping: " + orderId);
            System.out.println(orderId);
        };
    }

    @Bean
    public Consumer<String> processPayment(){
        return orderId -> {
            System.out.println("processPayment: " + orderId);
            System.out.println(orderId);
        };
    }
}

Here is application.yml:

spring:
  application:
      name: kafka-demo

  cloud:
    function:
      definition: processOrder;processPayment;processShipping
    stream:
      bindings:
        processOrder-out-0:
          destination: order_topic
        processPayment-in-0:
          destination: order_topic
        processShipping-in-0:
          destination: order_topic

  kafka:
    listener:
      port: 9094
    bootstrap-servers:
      - localhost:9094

Just in case, here are the dependencies:

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'org.springframework.cloud:spring-cloud-starter-function-web' 
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

Solution

  • It seems like you trigger your first function - processOrder via a REST endpoint, so the response from that function is returned to the HTTP call. My guess is that the data is not published to the Kafka topic.

    You can try changing your function processOrder to use StreamBridge as below:

    @Autowired StreamBridge streamBridge;
    
    @Bean
    public Function<String, String> processOrder(){
            return orderId -> {
                System.out.println("processOrder: " + orderId);
                System.out.println(orderId);
    
                streamBridge.send("order_topic",  orderId + " : " + System.currentTimeMillis());
    
                return orderId + " : " + System.currentTimeMillis();
            };
        }
    
    }
    

    This way, the function explicitly sends the data to the outbound. You can also use a Consumer instead of Function, but then again, you need to use the StreamBridge.