What I am doing? I'm connecting to a remote server using TLSv1.2 and sending a max of 300 bytes of data and receive a response back also of the same size.
What is expected to deliver? During load testing we are expected to deliver 1000TPS. Use at max using 50 Persistent TLS Connections.
What is going wrong?
@EnableIntegration
@IntegrationComponentScan
@Configuration
public class TcpClientConfig implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
private final ConnectionProperty connectionProperty;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
TcpClientConfig(ConnectionProperty connectionProperty) {
this.connectionProperty = connectionProperty;
}
@Bean
public AbstractClientConnectionFactory clientConnectionFactory() {
TcpNioClientConnectionFactory tcpNioClientConnectionFactory =
getTcpNioClientConnectionFactoryOf(
connectionProperty.getPrimaryHSMServerIpAddress(),
connectionProperty.getPrimaryHSMServerPort());
final List<AbstractClientConnectionFactory> fallBackConnections = getFallBackConnections();
fallBackConnections.add(tcpNioClientConnectionFactory);
final FailoverClientConnectionFactory failoverClientConnectionFactory =
new FailoverClientConnectionFactory(fallBackConnections);
return new CachingClientConnectionFactory(
failoverClientConnectionFactory, connectionProperty.getConnectionPoolSize());
}
@Bean
DefaultTcpNioSSLConnectionSupport connectionSupport() {
final DefaultTcpSSLContextSupport defaultTcpSSLContextSupport =
new DefaultTcpSSLContextSupport(
connectionProperty.getKeystorePath(),
connectionProperty.getTrustStorePath(),
connectionProperty.getKeystorePassword(),
connectionProperty.getTruststorePassword());
final String protocol = "TLSv1.2";
defaultTcpSSLContextSupport.setProtocol(protocol);
return new DefaultTcpNioSSLConnectionSupport(defaultTcpSSLContextSupport, false);
}
@Bean
public MessageChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler outboundGateway(AbstractClientConnectionFactory clientConnectionFactory) {
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
tcpOutboundGateway.setConnectionFactory(clientConnectionFactory);
return tcpOutboundGateway;
}
@ServiceActivator(inputChannel = "error-channel")
public void handleError(ErrorMessage em) {
Throwable throwable = em.getPayload();
if(ExceptionUtils.indexOfThrowable(throwable, IOException.class)!=-1){
ExceptionHandler.throwHsmSystemTimeoutException();
}
throw new RuntimeException(throwable);
}
private List<AbstractClientConnectionFactory> getFallBackConnections() {
final int size = connectionProperty.getAdditionalHSMServersConfig().size();
List<AbstractClientConnectionFactory> collector = new ArrayList<>(size);
for (final Map.Entry<String, Integer> server :
connectionProperty.getAdditionalHSMServersConfig().entrySet()) {
collector.add(getTcpNioClientConnectionFactoryOf(server.getKey(), server.getValue()));
}
return collector;
}
private TcpNioClientConnectionFactory getTcpNioClientConnectionFactoryOf(
final String ipAddress, final int port) {
TcpNioClientConnectionFactory tcpNioClientConnectionFactory =
new TcpNioClientConnectionFactory(ipAddress, port);
tcpNioClientConnectionFactory.setUsingDirectBuffers(true);
tcpNioClientConnectionFactory.setDeserializer(new CustomDeserializer());
tcpNioClientConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
tcpNioClientConnectionFactory.setSoKeepAlive(true);
tcpNioClientConnectionFactory.setConnectTimeout(connectionProperty.getConnectionTimeout());
tcpNioClientConnectionFactory.setSoTcpNoDelay(true);
tcpNioClientConnectionFactory.setTcpNioConnectionSupport(connectionSupport());
return tcpNioClientConnectionFactory;
}
}
@Component
class CustomDeserializer extends DefaultDeserializer {
private static final int MAX_LENGTH = 80;
@Override
public Object deserialize(final InputStream inputStream) throws IOException {
StringBuilder stringBuffer = new StringBuilder(MAX_LENGTH);
int read = Integer.MIN_VALUE;
for (int loop = 0; loop < 300 && read != ']'; loop++) {
read = inputStream.read();
stringBuffer.append((char) read);
}
String reply = stringBuffer.toString();
return reply;
}
@Component
@MessagingGateway(defaultRequestChannel = "outboundChannel",errorChannel ="error-channel" )
public interface TcpClientGateway {
String send(String message);
}
Additional Information:
{
"logger": "org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor",
"message": "No bean named \u0027errorChannel\u0027 has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.",
}
{
"logger": "org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor",
"message": "No bean named \u0027integrationHeaderChannelRegistry\u0027 has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.",
}
{
"logger": "org.springframework.cloud.context.scope.GenericScope",
"message": "BeanFactory id\u003da426df04-fa05-397e-a849-44d732e3faa4",
}
{
"logger": "org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker",
"message": "Bean \u0027org.springframework.integration.config.IntegrationManagementConfiguration\u0027 of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)",
}
{
"logger": "org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker",
"message": "Bean \u0027integrationChannelResolver\u0027 of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)",
}
{
"logger": "org.apache.coyote.http11.Http11NioProtocol",
"message": "Initializing ProtocolHandler [\"http-nio-8080\"]",
}
{
"logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
"message": "Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the \u0027errorChannel\u0027 channel",
}
{
"logger": "org.springframework.integration.channel.PublishSubscribeChannel",
"message": "Channel \u0027decryptor-api.errorChannel\u0027 has 1 subscriber(s).",
}
{
"logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
"message": "started bean \u0027_org.springframework.integration.errorLogger\u0027",
}
{
"logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
"message": "Adding {service-activator:tcpClientConfig.handleError.serviceActivator} as a subscriber to the \u0027error-channel\u0027 channel",
}
{
"logger": "org.springframework.integration.channel.DirectChannel",
"message": "Channel \u0027decryptor-api.error-channel\u0027 has 1 subscriber(s).",
}
{
"logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
"message": "started bean \u0027tcpClientConfig.handleError.serviceActivator\u0027",
}
{
"logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
"message": "Adding {ip:tcp-outbound-gateway:tcpClientConfig.outboundGateway.serviceActivator} as a subscriber to the \u0027outboundChannel\u0027 channel",
}
{
"logger": "org.springframework.integration.channel.DirectChannel",
"message": "Channel \u0027decryptor-api.outboundChannel\u0027 has 1 subscriber(s).",
}
{
"logger": "org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory",
"message": "started org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory@619b7436, host\u003d10.1.4.4, port\u003d9021",
}
{
"logger": "org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory",
"message": "started org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory@71c1ca1, host\u003d10.1.4.4, port\u003d9021",
}
{
"logger": "org.springframework.integration.ip.tcp.connection.FailoverClientConnectionFactory",
"message": "started org.springframework.integration.ip.tcp.connection.FailoverClientConnectionFactory@20cf3ab3, host\u003d, port\u003d0",
}
{
"logger": "org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory",
"message": "started bean \u0027clientConnectionFactory\u0027; defined in: \u0027class path resource [com/globalpay/enterprise/integrations/decrypter/configuration/TcpClientConfig.class]\u0027; from source: \u0027com.globalpay.enterprise.integrations.decrypter.configuration.TcpClientConfig.clientConnectionFactory()\u0027, host\u003d, port\u003d0",
}
{
"logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
"message": "started bean \u0027tcpClientConfig.outboundGateway.serviceActivator\u0027",
}
{
"logger": "org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway",
"message": "started bean \u0027tcpClientGateway#send(String)\u0027",
}
{
"logger": "org.springframework.integration.gateway.GatewayProxyFactoryBean",
"message": "started bean \u0027tcpClientGateway\u0027",
}
{
"logger": "org.apache.coyote.http11.Http11NioProtocol",
"message": "Starting ProtocolHandler [\"http-nio-8080\"]",
}
Solution: I modified my Custom Deserializer and it fixed all my issues:
@Component
class CustomDeserializer extends DefaultDeserializer {
private static final int MAX_LENGTH = 300;
@Override
public Object deserialize(final InputStream inputStream) throws IOException {
StringBuilder sb = new StringBuilder(MAX_LENGTH);
int read = Integer.MIN_VALUE;
int loop = 0;
while(read != ']') {
read = inputStream.read();
sb.append((char) read);
if(loop++ >300) { //I know my max value could be upto 300 bytes. This fixed my issues.
throw new RuntimeException("Illegal State Exception");
}
}
return sb.toString();
}