springspring-integrationspring-dsl

Spring integration payload casting exception


I upgraded my spring-integration-java-dsl 1.2.3 to spring-integration-core 6.3.4 after that I am facing data type issue, Whenever I try to reach the message payload, that is converting just an byte[] instead Order object and naturally, I get casting exception. I have doubt about missing a configuration parameter etc., because I could get Order object directly with old version of dependency

Here is the simplified demo project: repo

Cloud configuration in application.yml file

  cloud:
    stream:
      default:
        content-type: application/json
      defaultBinder: rabbit
      function:
        definition: processOrderRequestListener;                        
      bindings:                         
        processOrderRequestListener-in-0:  
          destination: orderQueue 
          group: OrderRequest 
          content-type: application/json
          consumer:
            concurrency: 10
            max-attempts: 1
        processOrderRequestListener-out-0:
          destination: processOrderRequestChannel 
          content-type: application/json

Publishing

@Bean
public Function<Message<Order>, Message<Order>> processOrderRequestListener() {
    return message -> {
        Message<Order> order = MessageBuilder
                .withPayload(message.getPayload())
                .copyHeaders(message.getHeaders())
                .setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
                .setHeader("spring.cloud.stream.message.contentType", "application/json")
                .build();
        return order; 
    };
}

Consuming

    @Bean
public IntegrationFlow processOrderRequestFlow() {
    return IntegrationFlow.from(PROCESS_ORDER_REQUEST_CHANNEL)
            .log(Level.TRACE, this.getClass().getName() + ".processOrderRequestFlow")
            .enrichHeaders(h -> h.header("source", "processOrderRequestFlow"))
            .routeToRecipients(retriesRouter -> retriesRouter
                    .recipientMessageSelector(RETRIES_CHANNEL,
                            //payload is byte[] instead Order object
                            message ->  (Order)message.getPayload()
                            ...

Solution

  • Thank you for sharing your sample!

    Unfortunately Spring Boot 1.5.x is very-very old and for a long time out of support. We have to adapt our projects to whatever we can upgrade right now.

    So, I see in your old project version you had this:

    @StreamListener(OrderProcessor.ORDER_REQUEST_INPUT_CHANNEL)
    @Output(PROCESS_ORDER_REQUEST_CHANNEL)
    public Message<OrderMessage> processOrderRequestListener(final Message<OrderMessage> orderMessage) throws IOException, SQLException {
        return orderMessage;
    }
    

    Which, essentially, let Spring Cloud Stream to perform conversion on the input and then you send that message into a channel for IntegrationFlow. Not a surprise that you receive in the flow an OrderMessage instead of byte[].

    I'm not sure how that private static final String PROCESS_ORDER_REQUEST_CHANNEL = "processOrderRequestListener-out-0"; is supposed to be consumed in a new version, but I'd suggest to not use that intermediate Function at all, but rather something like:

     return IntegrationFlow.from(OrderMessageConsumer.class, gateway -> gateway.beanName("processOrderRequestListener"))
    
     ...
     interface OrderMessageConsumer extends Consumer<OrderMessage> {}