spring-boottcpspring-integrationspring-integration-dsloutbound

How to resolve NO OUTPUT or CHANNEL REPLY HEADER


For a threeway communication between two external applications built with something different and my springboot app

I assume that the TcpInboundGateway created in the first instance is a request-reply, so it should wait for the response which is what external client 2 will send to it through my app.

In the service that gets the response from the TcpOutboundGateway, I assumed that just by setting the outputChannel on this service as the same name with the replyChannel in the tcpInboundGateway, then the connection will be made. But this does not work. So why the no output or reply header

 //The inbound gateway
  @Bean
     public TcpInboundGateway 
     tcpInboundGateway(AbstractServerConnectionFactory 
     serverConnectionFactory) {
    TcpInboundGateway gateway = new TcpInboundGateway();
    gateway.setConnectionFactory(serverConnectionFactory);
    gateway.setRequestChannel(requestChannel());
    gateway.setLoggingEnabled(true);
    gateway.setReplyChannel(backToPos());
    gateway.setReplyTimeout(60000);
    return gateway;
}

// The outbound gateway
@Bean
@ServiceActivator(inputChannel = "tcpUpslOutgateChannel")
public MessageHandler tcpOutboundGateway() {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(upslClientConnectionFactory());
    gateway.setUnsolicitedMessageChannel(unsolicitedChannel());
    gateway.setOutputChannelName("forUpslReplyChannel");
    return gateway;
}

//Service that handles the response
  @ServiceActivator(inputChannel = "forUpslReplyChannel", 
 outputChannel = "backToPos")
public String 
 processResponseMessageFromUpslForPurchase(Message<String> 
 messageFull) {
   return processResponseMessageForPurchase(messageFull);
}

The other configuration include channels and service handlers... Below are the logs

024-07-12T08:47:07.307+01:00  INFO 23544 --- [terminal-management] [pool-2-thread-2] c.r.t.s.t.ResponseService                : still in the response service
2024-07-12T08:47:07.311+01:00 DEBUG 23544 --- [terminal-management] [pool-2-thread-2] o.s.i.ip.tcp.TcpInboundGateway           : failure occurred in gateway sendAndReceive: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@25207619]
2024-07-12T08:47:07.311+01:00 ERROR 23544 --- [terminal-management] [pool-2-thread-2] o.s.i.i.tcp.connection.TcpNetConnection  : Exception sending message: GenericMessage [payload=byte[809], headers={ip_tcp_remotePort=55560, ip_connectionId=127.0.0.1:55560:30002:6a5442c5-0c0d-4217-aab2-fe49776b6d6c, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=c841c491-79dc-5e17-4e95-6f7ac6052b79, ip_hostname=127.0.0.1, timestamp=1720770425566}]

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@25207619]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-6.3.0.jar:6.3.0]
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108) 
        at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:206) ~[spring-integration-ip-6.3.0.jar:6.3.0]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:504) ~[spring-integration-core-6.3.0.jar:6.3.0]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:357) ~[spring-integration-core-6.3.0.jar:6.3.0]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:286) ~[spring-integration-core-6.3.0.jar:6.3.0]

Solution

  • The request-reply pattern in Spring Integration is implemented via Return Address EIP: https://www.enterpriseintegrationpatterns.com/patterns/messaging/ReturnAddress.html.

    So, when we produce message downstream from the Inbound Gateway, the replyChannel header is populated.

    That AbstractMessageProducingHandler in your stack trace is able to extract this header from the request message to deliver reply properly to waiting gateway session.

    So, essentially we don't need gateway.setReplyChannel(backToPos()); on the Inbound Gateway at all. I even have an opened issue to remove its support altogether: https://github.com/spring-projects/spring-integration/issues/3985.

    So, that your service activator could be simpler:

    @ServiceActivator(inputChannel = "forUpslReplyChannel")
    

    No any outputChannel at all. This would optimize not only your code, but runtime behavior since there won't be necessary in extra bridge between real reply channel and respective header value.

    However it turns out that request message for this forUpslReplyChannel does not have replyChannel header any more. Apparently you have some other steps in between tcpOutboundGateway and processResponseMessageFromUpslForPurchase. So, you create a new Message but lose headers from request message.

    The TcpOutboundGateway does not do that by itself because its logic like:

    /**
     * Subclasses may override this. True by default.
     * @return true if the request headers should be copied.
     */
    protected boolean shouldCopyRequestHeaders() {
        return true;
    }
    

    I believe a debug logging level for the org.springframework.integration category (or WireTap with LoggingHandler subscriber) is a good way to track what is going on with your message when it travels through the flow: https://docs.spring.io/spring-integration/reference/channel/configuration.html#channel-wiretap