I have implemented TCPClient for stream-oriented sockets which rely on the TCP/IP protocol. I have used Spring Integration.
My expectation was to create pool of 9 connections. Traffic would be handled by these 9 TCP connections only. But I realized that the connection are getting closed exactly after 300 seconds. (the last three lines of the log) This leads to establishing new TCP connection for new request. The customer (our TCP Server) says that they never close a connection. And customer is reporting that my application is trying to making more number of connections than allotted (i.e 9).
Here is the configuration.
@Bean
public AbstractClientConnectionFactory emirClientConnectionFactory() {
TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory("censored-host", 9999);
tcpNetClientConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
tcpNetClientConnectionFactory.setDeserializer(new EmirCustomStxHeaderLengthSerializer());
tcpNetClientConnectionFactory.setSerializer(new EmirCustomStxHeaderLengthSerializer());
tcpNetClientConnectionFactory.setSingleUse(false);
tcpNetClientConnectionFactory.setSoKeepAlive(true);
TcpSocketSupport emirTcpSocketSupport = new emirTcpSocketSupport();
tcpNetClientConnectionFactory.setTcpSocketSupport(emirTcpSocketSupport);
return tcpNetClientConnectionFactory;
}
@Bean
public AbstractClientConnectionFactory emirTcpCachedClientConnectionFactory() {
CachingClientConnectionFactory cachingConnFactory = new CachingClientConnectionFactory(emirClientConnectionFactory(), 9);
cachingConnFactory.setSingleUse(false);
cachingConnFactory.setLeaveOpen(true);
cachingConnFactory.setSoKeepAlive(true);
this.cachingClientConnectionFactory = cachingConnFactory;
return cachingConnFactory;
}
@Bean
public MessageChannel emirOutboundChannel() {
return new DirectChannel();
}
@Bean
public RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(50);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
retryAdvice.setRetryTemplate(retryTemplate);
return retryAdvice;
}
@Bean
@ServiceActivator(inputChannel = "emirOutboundChannel")
public MessageHandler emirOutbound(AbstractClientConnectionFactory emirTcpCachedClientConnectionFactory) {
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
List<Advice> list = new ArrayList<>();
list.add(retryAdvice());
tcpOutboundGateway.setAdviceChain(list);
tcpOutboundGateway.setRemoteTimeout(16000);
tcpOutboundGateway.setRequestTimeout(16000);
tcpOutboundGateway.setSendTimeout(16000);
tcpOutboundGateway.setConnectionFactory(emirTcpCachedClientConnectionFactory);
return tcpOutboundGateway;
}
}
I also have KEEP-ALIVE configured in EmirTcpSocketSupport.java
. I suspected that connections are getting closed because of AWS Nat gateway (~350 seconds). So, I put KEEPIDLE as 290 seconds.
@Override
public void postProcessSocket(Socket socket) {
if (this.sslVerifyHost && socket instanceof SSLSocket) {
SSLSocket sslSocket = (SSLSocket) socket;
SSLParameters sslParameters = sslSocket.getSSLParameters();
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
sslSocket.setSSLParameters(sslParameters);
}
try {
socket.setOption(ExtendedSocketOptions.TCP_KEEPIDLE, 290);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
Output console: TCP Socket Server running on host: censored-host & port: 9999
2023-08-16 12:53:44.150 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:158 - New connection censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.150 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory:313 - emirClientConnectionFactory: Added new connection: censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.152 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Added pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.152 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:193 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Reading...
2023-08-16 12:53:44.153 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:125 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Message sent GenericMessage [payload=byte[482], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1f32b135, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1f32b135, id=58330247-fe56-1322-c94e-248e1acf6338, timestamp=1692190424136}]
2023-08-16 12:53:44.461 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:217 - Message received GenericMessage [payload=byte[175], headers={ip_tcp_remotePort=9999, ip_connectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=c5ce21d9-cc1b-db8f-1a54-b6fe731d3281, ip_hostname=censored-host, timestamp=1692190424461}]
2023-08-16 12:53:44.461 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:348 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Waiting for listener registration
2023-08-16 12:53:44.461 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:352 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Listener registeredemirDoDeserialize message {}
2023-08-16 12:53:44.461 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Response GenericMessage [payload=byte[175], headers={ip_tcp_remotePort=9999, ip_connectionId=Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_actualConnectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=68f25612-5a89-d6ce-0898-8f44a193509d, ip_hostname=censored-host, timestamp=1692190424461}]
2023-08-16 12:53:44.461 my-project[Thread Id = 486] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Removed pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.461 my-project[Thread Id = 528] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Added pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.462 my-project[Thread Id = 528] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:125 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Message sent GenericMessage [payload=byte[482], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2255db4, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2255db4, id=918bfab3-bc85-7393-0c44-3d925fd2ef2f, timestamp=1692190424288}]ResponseDURSEG:null
2023-08-16 12:53:44.502 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:217 - Message received GenericMessage [payload=byte[130], headers={ip_tcp_remotePort=9999, ip_connectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=5aba1a68-3f8c-079e-64d7-b0c967f3804a, ip_hostname=censored-host, timestamp=1692190424502}]
2023-08-16 12:53:44.502 my-project[Thread Id = 528] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Response GenericMessage [payload=byte[130], headers={ip_tcp_remotePort=9999, ip_connectionId=Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_actualConnectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=018f8fe5-23c4-fe3a-246b-5d205e429898, ip_hostname=censored-host, timestamp=1692190424502}]
2023-08-16 12:53:44.502 my-project[Thread Id = 528] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Removed pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.502 my-project[Thread Id = 532] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Added pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:53:44.502 my-project[Thread Id = 532] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:125 - censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 Message sent GenericMessage [payload=byte[482], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6d58f0b5, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6d58f0b5, id=3024c123-a418-028b-ea59-99f1c2f3c572, timestamp=1692190424293}]
2023-08-16 12:53:44.541 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetConnection:217 - Message received GenericMessage [payload=byte[130], headers={ip_tcp_remotePort=9999, ip_connectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=2c8b1c8c-0ef3-c533-4f4a-47abad800aa8, ip_hostname=censored-host, timestamp=1692190424541}]emirDoDeserialize message {}
2023-08-16 12:53:44.541 my-project[Thread Id = 532] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Response GenericMessage [payload=byte[130], headers={ip_tcp_remotePort=9999, ip_connectionId=Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_actualConnectionId=censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4, ip_localInetAddress=/xx.x.x.xx, ip_address=xxx.xx.xxx.xx, id=21cfac2a-cd3b-b0db-5dff-8f70e7fa258c, ip_hostname=censored-host, timestamp=1692190424541}]
2023-08-16 12:53:44.541 my-project[Thread Id = 532] DEBUG org.springframework.integration.ip.tcp.TcpOutboundGateway:313 - Removed pending reply Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
2023-08-16 12:58:44.543 my-project[Thread Id = 540] DEBUG org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory$CachedConnection:413 - Connection Cached:censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4 has already been released
2023-08-16 13:04:00.661 my-project[Thread Id = 562] DEBUG org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory:313 - emirClientConnectionFactory: Removed closed connection: censored-host:9999:56570:07d49055-e897-4432-b235-032b390287a4
My question is even though the keep-alive time is set to 290 seconds, why does the connection close at exactly 300 second? (Subsequently, connection is removed from cache which makes my application make new TCP connection for the requests)
The Removed closed connection
debug message is emitted from the AbstractConnectionFactory.removeClosedConnectionsAndReturnOpenConnectionIds()
.
The call stack for this method is like:
harvestClosedConnections()
<- TcpNetClientConnectionFactory.buildNewConnection()
<- doObtain(boolean singleUse)
<- obtainNewConnection()
.
Where the logic is like this:
if (!singleUse) {
// Another write lock holder might have created a new one by now.
connection = obtainSharedConnection();
if (connection != null && connection.isOpen()) {
return connection;
}
}
return doObtain(singleUse);
So, even if you use singleUse = false
, that sharedConnection
is closed for some reason by its socket:
public boolean isOpen() {
return !this.socket.isClosed();
}
I doubt that we can do anything from the client perspective, if your server does close those sockets periodically.