javaspringspring-integration

Spring Integration upgrade to 6.3.4 - errorChannel no longer consuming messages after exception


I recently upgraded from spring-integration (1.5.4) & spring-cloud-stream(1.3.3.RELEASE) version to spring-integration-core 6.3.4. & spring-cloud-stream(4.1.1) After the upgrade, I noticed that my errorChannel stops consuming messages when an exception occurs in the flow. Previously, in version 1.5.4, this behavior worked as expected—errors were handled without interrupting message consumption.

I have not explicitly defined an errorChannel in my configuration, so I’m relying on the default behavior. For testing, I'm intentionally throwing an exception to see how it's handled. Here’s a simplified version of my code and repo for demo project demo project:

Log about errorChannel creation

2024-11-10 00:16:05.036 INFO 18784 --- [ main] c.p.O.M.Application : No active profile set, falling back to default profiles: default 2024-11-10 00:16:05.046 INFO 18784 --- [ main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@525d79f0: startup date [Sun Nov 10 00:16:05 GMT 2024]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@1fa121e2 2024-11-10 00:16:05.337 INFO 18784 --- [ main] o.s.i.config.IntegrationRegistrar : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2024-11-10 00:16:05.756 INFO 18784 --- [ main] o.s.cloud.context.scope.GenericScope : BeanFactory id=e62e62b0-2a73-3813-9f0c-5895d78d6bf5 2024-11-10 00:16:05.762 INFO 18784 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.

Error Channel Flow

@Bean
public IntegrationFlow errorFlow() {
    return IntegrationFlows.from("errorChannel")
            .log(Level.TRACE, this.getClass().getName() + ".errorFlow")
            // Filter multiple messages for the same input messages so that the error is only handled once
            .filter(Message.class, message -> {
                Message<?> failedMessage = message;

                if (failedMessage instanceof ErrorMessage &&
                        ((ErrorMessage) failedMessage).getPayload() instanceof MessagingException) {
                    failedMessage = ((MessagingException) ((ErrorMessage) failedMessage).getPayload()).getFailedMessage();
                }

                AmqpUniqueId amqpUniqueId = new AmqpUniqueId(
                        (String) failedMessage.getHeaders().get("amqp_consumerTag"),
                        (Long) failedMessage.getHeaders().get("amqp_deliveryTag"));

                return this.messageIdStore.putIfAbsent(amqpUniqueId, ZonedDateTime.now(ZoneId.of("Z"))) == null;
            })
            .handle(message -> {
                Message<?> failedMessage = message;

                if (failedMessage instanceof ErrorMessage &&
                        ((ErrorMessage) failedMessage).getPayload() instanceof MessagingException) {
                    failedMessage = ((MessagingException) ((ErrorMessage) failedMessage).getPayload()).getFailedMessage();
                }
                Channel channel = (Channel) failedMessage.getHeaders().get(AmqpHeaders.CHANNEL);
                Long deliveryTag = (Long) failedMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
                OrderStatus orderStatus = OrderStatus.FAILED;

                if (channel != null) {
                    orderStatus = OrderStatus.RETRYING;

                    try {
                        channel.basicNack(deliveryTag, false, false);
                    } catch (IOException exception) {
                        log.error("channel operation failed", exception);
                    }
                } else {
                    log.error("can't access channel: " + message.toString());
                }

                Long orderId = getOrderIdFromFailedMessage(failedMessage);
                if (orderId != null) {
                    try {
                        this.database.setOrderStatus(orderId, orderStatus.getId(), null);
                    } catch (SQLException exception) {
                        log.error("An Error occurred while updating order status", exception);
                    }
                } else {
                    log.error("unable to obtain order ID: " + message.toString());
                }
            })
            .get();
}

My Flow

@Bean
public IntegrationFlow orderRequestFlow() {
    return IntegrationFlow.from("OrderRequestListener-out-0")
            .log(Level.TRACE, this.getClass().getName() + ".orderRequestFlow")
            .enrichHeaders(h -> h.header("source", "orderRequestFlow"))
            .<byte[]>handle((message, responseHeaders) -> {
                return jsonMessageConverter.convert(message, OrderMessage.class, responseHeaders);
            })
            .routeToRecipients(exhaustedRetriesRouter -> exhaustedRetriesRouter
                    .recipientMessageSelector(GenericFlow.EXHAUSTED_RETRIES_CHANNEL,
                            message -> isExhaustedMessage(message)
                    ).defaultSubFlowMapping(orderAttemptSubFlow -> orderAttemptSubFlow
                            .<OrderMessage, OrderRequest>transform(orderMessage -> {
                                try {
                                // Testing Exception Handling
                                   if(orderMessage != null)
                                      throw new SQLException("OrderMessage is not null");
                                                
                                    return this.orderRequestFactory.getOrderDetails(orderMessage);
                                } catch (final SQLException exception) {
                                    throw new MessagingException("Transform failed", exception);
                                }
                            })
                            .routeToRecipients(unableToOrderRouter -> unableToOrderRouter

Questions:

  1. Has the behavior or implementation of the errorChannel changed in Spring Integration 6.x?

  2. Do I need to explicitly define or configure errorChannel in the latest version?

  3. Are there any additional settings needed to ensure errorChannel continues to handle exceptions without stopping message consumption?

Any guidance on handling errors and ensuring consistent message processing behavior with Spring Integration 6.x would be greatly appreciated.


Solution

  • I won't repeat an End of Life message for Spring Boot 1.5.x, but similar discussion is here: Spring integration payload casting exception.

    The problem with the current version of your application is.

    Your processOrderRequestListener-out-0 is bound as an output of the function with a subscriber to publish data into RabbitMQ. At the same time you declare another subscriber for this channel in your project:

    @Bean
    public IntegrationFlow processOrderRequestFlow() {
        return IntegrationFlow.from(PROCESS_ORDER_REQUEST_CHANNEL)
    

    This subscriber is failing because of byte[] payload, but that new RuntimeException("Failed to process order") is not sent to the errorChannel because of failover logic in the UnicastingDispatcher:

                if (!isLast && failover) {
                    logExceptionBeforeFailOver(ex, handler, message);
                }
    
                if (isLast || !failover) {
                    handleExceptions(exceptions, message);
                }
    

    It is indeed not last because we also have a handler from the binder subscribed to this channel. And message is sent over there successfully. That's why there is no errorChannel interaction.

    If you still want to have this structure instead of what I've suggested in other our discussion, then you can do this:

        ExecutorChannel executorChannel = new ExecutorChannel(executor);
        executorChannel.setFailover(false);
        return executorChannel;
    

    And exception is going to be thrown up to the errorChannel handling:

    Processing order: [B@57f609b4
    2024-11-12T11:38:15.199-05:00 ERROR 5308 --- [pool-3-thread-3] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@3e4d40ea] (processOrderRequestFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0)], failedMessage=GenericMessage [payload=byte[9], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=orderQueue.OrderRequest, amqp_channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,9), conn: Proxy@3a72e2e8 Shared Rabbit Connection: SimpleConnection@4a4ff5ed [delegate=amqp://guest@127.0.0.1:5672/, localPort=55841], amqp_redelivered=true, target-protocol=amqp, amqp_receivedRoutingKey=orderQueue.OrderRequest, amqp_contentEncoding=UTF-8, id=1cd39258-3152-f8a6-bcd5-5239d5fb13b2, amqp_consumerTag=amq.ctag-4aXxnVqi8GgxqdSQIcJqOw, sourceData=(Body:'test data' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=orderQueue.OrderRequest, deliveryTag=1, consumerTag=amq.ctag-4aXxnVqi8GgxqdSQIcJqOw, consumerQueue=orderQueue.OrderRequest]), contentType=application/json, timestamp=1731429455456}]
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
        at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:160)
        at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:129)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
    Caused by: java.lang.RuntimeException: Failed to process order
        at com.cw.cloudstream.IntegrationConfig.lambda$processOrderRequestFlow$1(IntegrationConfig.java:57)
        at org.springframework.integration.handler.LambdaMessageProcessor.invokeMethod(LambdaMessageProcessor.java:208)
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:121)
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:145)
        at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    

    So, better to get rid of function intermediary and try do not abuse those binding channels.