In the following code, I am trying to create a message by calling processOrder()
via REST endpoint. Then, I want to pass the result of processOrder()
to processShipping()
and processPayment
.
However, Whenever I call the rest endpoint http://localhost:8080/processOrder
, just the processOrder()
is called. What's wrong here?
package com.example.kafkademo.functions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;
import java.util.function.Function;
@Configuration
public class MessageFunctions {
@Bean
public Function<String, String> processOrder(){
return orderId -> {
System.out.println("processOrder: " + orderId);
System.out.println(orderId);
return orderId + " : " + System.currentTimeMillis();
};
}
@Bean
public Consumer<String> processShipping(){
return orderId -> {
System.out.println("processShipping: " + orderId);
System.out.println(orderId);
};
}
@Bean
public Consumer<String> processPayment(){
return orderId -> {
System.out.println("processPayment: " + orderId);
System.out.println(orderId);
};
}
}
Here is application.yml
:
spring:
application:
name: kafka-demo
cloud:
function:
definition: processOrder;processPayment;processShipping
stream:
bindings:
processOrder-out-0:
destination: order_topic
processPayment-in-0:
destination: order_topic
processShipping-in-0:
destination: order_topic
kafka:
listener:
port: 9094
bootstrap-servers:
- localhost:9094
Just in case, here are the dependencies:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.cloud:spring-cloud-starter-function-web'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
It seems like you trigger your first function - processOrder
via a REST endpoint, so the response from that function is returned to the HTTP call. My guess is that the data is not published to the Kafka topic.
You can try changing your function processOrder
to use StreamBridge
as below:
@Autowired StreamBridge streamBridge;
@Bean
public Function<String, String> processOrder(){
return orderId -> {
System.out.println("processOrder: " + orderId);
System.out.println(orderId);
streamBridge.send("order_topic", orderId + " : " + System.currentTimeMillis());
return orderId + " : " + System.currentTimeMillis();
};
}
}
This way, the function explicitly sends the data to the outbound. You can also use a Consumer
instead of Function
, but then again, you need to use the StreamBridge
.