javaspring-bootspring-integrationtcpclientspring-integration-dsl

Spring Integration TCP Client unable to receive messages from external TCP Server


I've a use case where I need to send messages to an external TCP Server with 2 IP/ports for Round Robin distribution (each connection for each pair of IP/port, and it should be keep-alive for message exchange).

I'm using Spring Integration and I don't have information about the underlying implementation of TCP Server but the message structure consists of a header with 4 bytes of ASCII characters used to specify the length of the message (excluding the header) followed by the message itself.

For example, if a message is 128 bytes long, the header value "0128" will be added to the beginning of the message. Therefore, the actual length of the data being sent is 132 bytes.

The example message will be like this: 0022thisisanexamplemessage

I've follow this post to setup 2 TcpSendingMessageHandler instances with 2 FailoverClientConnectionFactory instances and use the same outboundChannel as the input channel to achieve Round Robin distribution as below (and successfully sent messages to the TCP Server):

@Bean
public MessageChannel outboundChannel() {
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler tcpOutboundChannelOne() {
    TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
    handler.setConnectionFactory(failoverClientConnectionFactoryOne());
    handler.setClientMode(true);
    return handler;
}

@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler tcpOutboundChannelTwo() {
    TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
    handler.setConnectionFactory(failoverClientConnectionFactoryTwo());
    handler.setClientMode(true);
    return handler;
}

@Bean
public FailoverClientConnectionFactory failoverClientConnectionFactoryOne() {
    List<AbstractClientConnectionFactory> factories = new ArrayList<>();
    factories.add(tcpNetClientConnectionFactoryOne());
    factories.add(tcpNetClientConnectionFactoryTwo());
    FailoverClientConnectionFactory cf = new FailoverClientConnectionFactory(factories);
    cf.setLeaveOpen(true);
    cf.setSingleUse(false);
    cf.setSoKeepAlive(true);
    cf.setCloseOnRefresh(true);
    cf.setRefreshSharedInterval(10000L);
    return cf;
}

@Bean
public FailoverClientConnectionFactory failoverClientConnectionFactoryTwo() {
    List<AbstractClientConnectionFactory> factories = new ArrayList<>();
    factories.add(tcpNetClientConnectionFactoryTwo());
    factories.add(tcpNetClientConnectionFactoryOne());
    FailoverClientConnectionFactory cf = new FailoverClientConnectionFactory(factories);
    cf.setLeaveOpen(true);
    cf.setSingleUse(false);
    cf.setSoKeepAlive(true);
    cf.setCloseOnRefresh(true);
    cf.setRefreshSharedInterval(10000L);
    return cf;
}

@Bean
public TcpNetClientConnectionFactoryOne tcpNetClientConnectionFactoryOne() {
    TcpNetClientConnectionFactoryOne cf = new TcpNetClientConnectionFactoryOne(tcpServerHost, tcpServerPort1);
    cf.setLeaveOpen(true);
    cf.setSingleUse(false);
    cf.setSoKeepAlive(true);
    cf.setSerializer(codec());
    cf.setDeserializer(codec());
    cf.setConnectionTimeout(connectionTimeout);
    return cf;
}


@Bean
public TcpNetClientConnectionFactoryOne tcpNetClientConnectionFactoryTwo() {
    TcpNetClientConnectionFactoryOne cf = new TcpNetClientConnectionFactoryOne(tcpServerHost, tcpServerPort2);
    cf.setLeaveOpen(true);
    cf.setSingleUse(false);
    cf.setSoKeepAlive(true);
    cf.setSerializer(codec());
    cf.setDeserializer(codec());
    cf.setConnectionTimeout(connectionTimeout);
    return cf;
}

private ByteArrayLengthHeaderSerializer codec() {
    ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer();
    serializer.setMaxMessageSize(8 * 1024);
    serializer.setInclusive(false);
    return serializer;
}

For incoming messages from the TCP Server, I've setup 2 TcpReceivingChannelAdapter instances with 2 FailoverClientConnectionFactory instances as above and use a single inboundChannel as the output channel:

@Bean
public MessageChannel inboundChannel() {
    return new DirectChannel();
}

@Bean
public MessageProducer tcpInboundChannelOne() {
    TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
    inboundChannelAdapter.setConnectionFactory(failoverClientConnectionFactoryOne());
    inboundChannelAdapter.setOutputChannel(inboundChannel());
    return inboundChannelAdapter;
}

@Bean
public MessageProducer tcpInboundChannelTwo() {
    TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
    inboundChannelAdapter.setConnectionFactory(failoverClientConnectionFactoryTwo());
    inboundChannelAdapter.setOutputChannel(inboundChannel());
    return inboundChannelAdapter;
}

@Bean
public IntegrationFlow tcpInboundFlow() {
    return IntegrationFlow.from(inboundChannel())
            .handle(message -> handleMessage(byte[] message.getPayload()))
            .get();
}

private void handleMessage(byte[] payload) {
    // Process incoming message here
    log.info("Received: {}", new String(payload));
}

The problem is I can only send messages to the TCP Server but unable to receive any messages sent from the Server. What have I done wrong? Please help.


Solution

  • Thanks to @ArtemBilan's suggestion I have create a custom Deserializer that extends AbstractByteArraySerializer and add to the TcpNetCientConnectionFactory then the inboundChannel is able to receives incoming messages.

    public class CustomSerializer extends AbstractByteArraySerializer {
    
        @Override
        public byte[] deserialize(InputStream inputStream) throws IOException {
            byte[] messageHeaderBytes = new byte[4];
            int bytesRead = inputStream.read(messageHeaderBytes);
            if (bytesRead != 4) {
                log.error("Invalid message header length");
                throw new IOException("Invalid message header length");
            }
            int messageLength = Integer.parseInt(new String( messageHeaderBytes, StandardCharsets.US_ASCII));
            byte[] messageBytes = new byte[messageLength];
            bytesRead = inputStream.read(messageBytes);
            if (bytesRead != messageLength) {
                log.error("Invalid message length");
                throw new IOException("Invalid message length");
            }
            return messageBytes;
        }
    
        @Override
        public void serialize(byte[] bytes, OutputStream outputStream) throws IOException {
            outputStream.write(bytes);
        }
    }