I upgraded my spring-integration-java-dsl 1.2.3 to spring-integration-core 6.3.4 after that I am facing data type issue, Whenever I try to reach the message payload, that is converting just an byte[] instead Order object and naturally, I get casting exception. I have doubt about missing a configuration parameter etc., because I could get Order object directly with old version of dependency
Here is the simplified demo project: repo
Cloud configuration in application.yml file
cloud:
stream:
default:
content-type: application/json
defaultBinder: rabbit
function:
definition: processOrderRequestListener;
bindings:
processOrderRequestListener-in-0:
destination: orderQueue
group: OrderRequest
content-type: application/json
consumer:
concurrency: 10
max-attempts: 1
processOrderRequestListener-out-0:
destination: processOrderRequestChannel
content-type: application/json
Publishing
@Bean
public Function<Message<Order>, Message<Order>> processOrderRequestListener() {
return message -> {
Message<Order> order = MessageBuilder
.withPayload(message.getPayload())
.copyHeaders(message.getHeaders())
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.setHeader("spring.cloud.stream.message.contentType", "application/json")
.build();
return order;
};
}
Consuming
@Bean
public IntegrationFlow processOrderRequestFlow() {
return IntegrationFlow.from(PROCESS_ORDER_REQUEST_CHANNEL)
.log(Level.TRACE, this.getClass().getName() + ".processOrderRequestFlow")
.enrichHeaders(h -> h.header("source", "processOrderRequestFlow"))
.routeToRecipients(retriesRouter -> retriesRouter
.recipientMessageSelector(RETRIES_CHANNEL,
//payload is byte[] instead Order object
message -> (Order)message.getPayload()
...
Thank you for sharing your sample!
Unfortunately Spring Boot 1.5.x
is very-very old and for a long time out of support. We have to adapt our projects to whatever we can upgrade right now.
So, I see in your old project version you had this:
@StreamListener(OrderProcessor.ORDER_REQUEST_INPUT_CHANNEL)
@Output(PROCESS_ORDER_REQUEST_CHANNEL)
public Message<OrderMessage> processOrderRequestListener(final Message<OrderMessage> orderMessage) throws IOException, SQLException {
return orderMessage;
}
Which, essentially, let Spring Cloud Stream to perform conversion on the input and then you send that message into a channel for IntegrationFlow
. Not a surprise that you receive in the flow an OrderMessage
instead of byte[]
.
I'm not sure how that private static final String PROCESS_ORDER_REQUEST_CHANNEL = "processOrderRequestListener-out-0";
is supposed to be consumed in a new version, but I'd suggest to not use that intermediate Function
at all, but rather something like:
return IntegrationFlow.from(OrderMessageConsumer.class, gateway -> gateway.beanName("processOrderRequestListener"))
...
interface OrderMessageConsumer extends Consumer<OrderMessage> {}