spring-integrationspring-integration-dslspring-integration-ip

Why do requests interfere with one another requests and slow performance with Spring Integration during load testing?


What I am doing? I'm connecting to a remote server using TLSv1.2 and sending a max of 300 bytes of data and receive a response back also of the same size.

What is expected to deliver? During load testing we are expected to deliver 1000TPS. Use at max using 50 Persistent TLS Connections.


What is going wrong?

  1. During load testing, the max TPS we are receiving is 250TPS.
  2. During load testing, we observed that the requests are interfering causing one request's response to get into another request response.

Configurations:

@EnableIntegration
@IntegrationComponentScan
@Configuration
public class TcpClientConfig implements ApplicationEventPublisherAware {

  private ApplicationEventPublisher applicationEventPublisher;
  private final ConnectionProperty connectionProperty;

  @Override
  public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
    this.applicationEventPublisher = applicationEventPublisher;
  }

  TcpClientConfig(ConnectionProperty connectionProperty) {
    this.connectionProperty = connectionProperty;
  }

  @Bean
  public AbstractClientConnectionFactory clientConnectionFactory() {
    TcpNioClientConnectionFactory tcpNioClientConnectionFactory =
        getTcpNioClientConnectionFactoryOf(
            connectionProperty.getPrimaryHSMServerIpAddress(),
            connectionProperty.getPrimaryHSMServerPort());

    final List<AbstractClientConnectionFactory> fallBackConnections = getFallBackConnections();
    fallBackConnections.add(tcpNioClientConnectionFactory);

    final FailoverClientConnectionFactory failoverClientConnectionFactory =
        new FailoverClientConnectionFactory(fallBackConnections);

    return new CachingClientConnectionFactory(
        failoverClientConnectionFactory, connectionProperty.getConnectionPoolSize());
  }

  @Bean
  DefaultTcpNioSSLConnectionSupport connectionSupport() {

    final DefaultTcpSSLContextSupport defaultTcpSSLContextSupport =
        new DefaultTcpSSLContextSupport(
            connectionProperty.getKeystorePath(),
            connectionProperty.getTrustStorePath(),
            connectionProperty.getKeystorePassword(),
            connectionProperty.getTruststorePassword());

    final String protocol = "TLSv1.2";
    defaultTcpSSLContextSupport.setProtocol(protocol);
    return new DefaultTcpNioSSLConnectionSupport(defaultTcpSSLContextSupport, false);
  }

  @Bean
  public MessageChannel outboundChannel() {
    return new DirectChannel();
  }



  @Bean
  @ServiceActivator(inputChannel = "outboundChannel")
  public MessageHandler outboundGateway(AbstractClientConnectionFactory clientConnectionFactory) {
    TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
    tcpOutboundGateway.setConnectionFactory(clientConnectionFactory);
    return tcpOutboundGateway;
  }
  @ServiceActivator(inputChannel = "error-channel")
  public void handleError(ErrorMessage em)  {
    Throwable throwable = em.getPayload();
    if(ExceptionUtils.indexOfThrowable(throwable, IOException.class)!=-1){
      ExceptionHandler.throwHsmSystemTimeoutException();
    }
    throw new RuntimeException(throwable);
  }

  private List<AbstractClientConnectionFactory> getFallBackConnections() {
    final int size = connectionProperty.getAdditionalHSMServersConfig().size();
    List<AbstractClientConnectionFactory> collector = new ArrayList<>(size);
    for (final Map.Entry<String, Integer> server :
        connectionProperty.getAdditionalHSMServersConfig().entrySet()) {
      collector.add(getTcpNioClientConnectionFactoryOf(server.getKey(), server.getValue()));
    }
    return collector;
  }

  private TcpNioClientConnectionFactory getTcpNioClientConnectionFactoryOf(
      final String ipAddress, final int port) {
    TcpNioClientConnectionFactory tcpNioClientConnectionFactory =
        new TcpNioClientConnectionFactory(ipAddress, port);
    tcpNioClientConnectionFactory.setUsingDirectBuffers(true);
    tcpNioClientConnectionFactory.setDeserializer(new CustomDeserializer());
    tcpNioClientConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
    tcpNioClientConnectionFactory.setSoKeepAlive(true);
    tcpNioClientConnectionFactory.setConnectTimeout(connectionProperty.getConnectionTimeout());
    tcpNioClientConnectionFactory.setSoTcpNoDelay(true);
    tcpNioClientConnectionFactory.setTcpNioConnectionSupport(connectionSupport());
    return tcpNioClientConnectionFactory;
  }
}

Deserializer

@Component
class CustomDeserializer extends DefaultDeserializer {
  private static final int MAX_LENGTH = 80;

  @Override
  public Object deserialize(final InputStream inputStream) throws IOException {
    StringBuilder stringBuffer = new StringBuilder(MAX_LENGTH);

    int read = Integer.MIN_VALUE;
    for (int loop = 0; loop < 300 && read != ']'; loop++) {
      read = inputStream.read();
      stringBuffer.append((char) read);
    }
    String reply = stringBuffer.toString();

    return reply;
  }

Gateway:

@Component
@MessagingGateway(defaultRequestChannel = "outboundChannel",errorChannel ="error-channel" )
public interface TcpClientGateway {
    String send(String message);
}

Additional Information:

  1. Our service is deployed on the GKE having the 8 to 10 Pods.
  2. The target server is capable of handling more than 2000TPS we are licensed to use only 1000TPS.

Edit: Added startup logs:

{
    "logger": "org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor",
    "message": "No bean named \u0027errorChannel\u0027 has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.",
}
{
    "logger": "org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor",
    "message": "No bean named \u0027integrationHeaderChannelRegistry\u0027 has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.",
}
{
    "logger": "org.springframework.cloud.context.scope.GenericScope",
    "message": "BeanFactory id\u003da426df04-fa05-397e-a849-44d732e3faa4",
}
{
    "logger": "org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker",
    "message": "Bean \u0027org.springframework.integration.config.IntegrationManagementConfiguration\u0027 of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)",
}
{
    "logger": "org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker",
    "message": "Bean \u0027integrationChannelResolver\u0027 of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)",
}
{
    "logger": "org.apache.coyote.http11.Http11NioProtocol",
    "message": "Initializing ProtocolHandler [\"http-nio-8080\"]",
}
{
    "logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
    "message": "Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the \u0027errorChannel\u0027 channel",
}
{
    "logger": "org.springframework.integration.channel.PublishSubscribeChannel",
    "message": "Channel \u0027decryptor-api.errorChannel\u0027 has 1 subscriber(s).",
}
{
    "logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
    "message": "started bean \u0027_org.springframework.integration.errorLogger\u0027",
}
{
    "logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
    "message": "Adding {service-activator:tcpClientConfig.handleError.serviceActivator} as a subscriber to the \u0027error-channel\u0027 channel",
}
{
    "logger": "org.springframework.integration.channel.DirectChannel",
    "message": "Channel \u0027decryptor-api.error-channel\u0027 has 1 subscriber(s).",
}
{
    "logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
    "message": "started bean \u0027tcpClientConfig.handleError.serviceActivator\u0027",
}
{
    "logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
    "message": "Adding {ip:tcp-outbound-gateway:tcpClientConfig.outboundGateway.serviceActivator} as a subscriber to the \u0027outboundChannel\u0027 channel",
}
{
    "logger": "org.springframework.integration.channel.DirectChannel",
    "message": "Channel \u0027decryptor-api.outboundChannel\u0027 has 1 subscriber(s).",
}
{
    "logger": "org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory",
    "message": "started org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory@619b7436, host\u003d10.1.4.4, port\u003d9021",
}
{
    "logger": "org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory",
    "message": "started org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory@71c1ca1, host\u003d10.1.4.4, port\u003d9021",
}
{
    "logger": "org.springframework.integration.ip.tcp.connection.FailoverClientConnectionFactory",
    "message": "started org.springframework.integration.ip.tcp.connection.FailoverClientConnectionFactory@20cf3ab3, host\u003d, port\u003d0",
}
{
    "logger": "org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory",
    "message": "started bean \u0027clientConnectionFactory\u0027; defined in: \u0027class path resource [com/globalpay/enterprise/integrations/decrypter/configuration/TcpClientConfig.class]\u0027; from source: \u0027com.globalpay.enterprise.integrations.decrypter.configuration.TcpClientConfig.clientConnectionFactory()\u0027, host\u003d, port\u003d0",
}
{
    "logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
    "message": "started bean \u0027tcpClientConfig.outboundGateway.serviceActivator\u0027",
}
{
    "logger": "org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway",
    "message": "started bean \u0027tcpClientGateway#send(String)\u0027",
}
{
    "logger": "org.springframework.integration.gateway.GatewayProxyFactoryBean",
    "message": "started bean \u0027tcpClientGateway\u0027",
}
{
    "logger": "org.apache.coyote.http11.Http11NioProtocol",
    "message": "Starting ProtocolHandler [\"http-nio-8080\"]",
}

Solution

  • Solution: I modified my Custom Deserializer and it fixed all my issues:

    @Component
    class CustomDeserializer extends DefaultDeserializer {
      private static final int MAX_LENGTH = 300;
    
      @Override
      public Object deserialize(final InputStream inputStream) throws IOException {
        StringBuilder sb = new StringBuilder(MAX_LENGTH);
       
        int read = Integer.MIN_VALUE;
        int loop = 0;
        while(read != ']') {
          read = inputStream.read();
          sb.append((char) read);
          if(loop++ >300) { //I know my max value could be upto 300 bytes. This fixed my issues.
             throw new RuntimeException("Illegal State Exception");
          }
        }
        return sb.toString();
      }