0

What I am doing? I'm connecting to a remote server using TLSv1.2 and sending a max of 300 bytes of data and receive a response back also of the same size.

What is expected to deliver? During load testing we are expected to deliver 1000TPS. Use at max using 50 Persistent TLS Connections.


What is going wrong?

  1. During load testing, the max TPS we are receiving is 250TPS.
  2. During load testing, we observed that the requests are interfering causing one request's response to get into another request response.

Configurations:

@EnableIntegration
@IntegrationComponentScan
@Configuration
public class TcpClientConfig implements ApplicationEventPublisherAware {

  private ApplicationEventPublisher applicationEventPublisher;
  private final ConnectionProperty connectionProperty;

  @Override
  public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
    this.applicationEventPublisher = applicationEventPublisher;
  }

  TcpClientConfig(ConnectionProperty connectionProperty) {
    this.connectionProperty = connectionProperty;
  }

  @Bean
  public AbstractClientConnectionFactory clientConnectionFactory() {
    TcpNioClientConnectionFactory tcpNioClientConnectionFactory =
        getTcpNioClientConnectionFactoryOf(
            connectionProperty.getPrimaryHSMServerIpAddress(),
            connectionProperty.getPrimaryHSMServerPort());

    final List<AbstractClientConnectionFactory> fallBackConnections = getFallBackConnections();
    fallBackConnections.add(tcpNioClientConnectionFactory);

    final FailoverClientConnectionFactory failoverClientConnectionFactory =
        new FailoverClientConnectionFactory(fallBackConnections);

    return new CachingClientConnectionFactory(
        failoverClientConnectionFactory, connectionProperty.getConnectionPoolSize());
  }

  @Bean
  DefaultTcpNioSSLConnectionSupport connectionSupport() {

    final DefaultTcpSSLContextSupport defaultTcpSSLContextSupport =
        new DefaultTcpSSLContextSupport(
            connectionProperty.getKeystorePath(),
            connectionProperty.getTrustStorePath(),
            connectionProperty.getKeystorePassword(),
            connectionProperty.getTruststorePassword());

    final String protocol = "TLSv1.2";
    defaultTcpSSLContextSupport.setProtocol(protocol);
    return new DefaultTcpNioSSLConnectionSupport(defaultTcpSSLContextSupport, false);
  }

  @Bean
  public MessageChannel outboundChannel() {
    return new DirectChannel();
  }



  @Bean
  @ServiceActivator(inputChannel = "outboundChannel")
  public MessageHandler outboundGateway(AbstractClientConnectionFactory clientConnectionFactory) {
    TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
    tcpOutboundGateway.setConnectionFactory(clientConnectionFactory);
    return tcpOutboundGateway;
  }
  @ServiceActivator(inputChannel = "error-channel")
  public void handleError(ErrorMessage em)  {
    Throwable throwable = em.getPayload();
    if(ExceptionUtils.indexOfThrowable(throwable, IOException.class)!=-1){
      ExceptionHandler.throwHsmSystemTimeoutException();
    }
    throw new RuntimeException(throwable);
  }

  private List<AbstractClientConnectionFactory> getFallBackConnections() {
    final int size = connectionProperty.getAdditionalHSMServersConfig().size();
    List<AbstractClientConnectionFactory> collector = new ArrayList<>(size);
    for (final Map.Entry<String, Integer> server :
        connectionProperty.getAdditionalHSMServersConfig().entrySet()) {
      collector.add(getTcpNioClientConnectionFactoryOf(server.getKey(), server.getValue()));
    }
    return collector;
  }

  private TcpNioClientConnectionFactory getTcpNioClientConnectionFactoryOf(
      final String ipAddress, final int port) {
    TcpNioClientConnectionFactory tcpNioClientConnectionFactory =
        new TcpNioClientConnectionFactory(ipAddress, port);
    tcpNioClientConnectionFactory.setUsingDirectBuffers(true);
    tcpNioClientConnectionFactory.setDeserializer(new CustomDeserializer());
    tcpNioClientConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
    tcpNioClientConnectionFactory.setSoKeepAlive(true);
    tcpNioClientConnectionFactory.setConnectTimeout(connectionProperty.getConnectionTimeout());
    tcpNioClientConnectionFactory.setSoTcpNoDelay(true);
    tcpNioClientConnectionFactory.setTcpNioConnectionSupport(connectionSupport());
    return tcpNioClientConnectionFactory;
  }
}

Deserializer

@Component
class CustomDeserializer extends DefaultDeserializer {
  private static final int MAX_LENGTH = 80;

  @Override
  public Object deserialize(final InputStream inputStream) throws IOException {
    StringBuilder stringBuffer = new StringBuilder(MAX_LENGTH);

    int read = Integer.MIN_VALUE;
    for (int loop = 0; loop < 300 && read != ']'; loop++) {
      read = inputStream.read();
      stringBuffer.append((char) read);
    }
    String reply = stringBuffer.toString();

    return reply;
  }

Gateway:

@Component
@MessagingGateway(defaultRequestChannel = "outboundChannel",errorChannel ="error-channel" )
public interface TcpClientGateway {
    String send(String message);
}

Additional Information:

  1. Our service is deployed on the GKE having the 8 to 10 Pods.
  2. The target server is capable of handling more than 2000TPS we are licensed to use only 1000TPS.

Edit: Added startup logs:

{
    "logger": "org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor",
    "message": "No bean named \u0027errorChannel\u0027 has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.",
}
{
    "logger": "org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor",
    "message": "No bean named \u0027integrationHeaderChannelRegistry\u0027 has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.",
}
{
    "logger": "org.springframework.cloud.context.scope.GenericScope",
    "message": "BeanFactory id\u003da426df04-fa05-397e-a849-44d732e3faa4",
}
{
    "logger": "org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker",
    "message": "Bean \u0027org.springframework.integration.config.IntegrationManagementConfiguration\u0027 of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)",
}
{
    "logger": "org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker",
    "message": "Bean \u0027integrationChannelResolver\u0027 of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)",
}
{
    "logger": "org.apache.coyote.http11.Http11NioProtocol",
    "message": "Initializing ProtocolHandler [\"http-nio-8080\"]",
}
{
    "logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
    "message": "Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the \u0027errorChannel\u0027 channel",
}
{
    "logger": "org.springframework.integration.channel.PublishSubscribeChannel",
    "message": "Channel \u0027decryptor-api.errorChannel\u0027 has 1 subscriber(s).",
}
{
    "logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
    "message": "started bean \u0027_org.springframework.integration.errorLogger\u0027",
}
{
    "logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
    "message": "Adding {service-activator:tcpClientConfig.handleError.serviceActivator} as a subscriber to the \u0027error-channel\u0027 channel",
}
{
    "logger": "org.springframework.integration.channel.DirectChannel",
    "message": "Channel \u0027decryptor-api.error-channel\u0027 has 1 subscriber(s).",
}
{
    "logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
    "message": "started bean \u0027tcpClientConfig.handleError.serviceActivator\u0027",
}
{
    "logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
    "message": "Adding {ip:tcp-outbound-gateway:tcpClientConfig.outboundGateway.serviceActivator} as a subscriber to the \u0027outboundChannel\u0027 channel",
}
{
    "logger": "org.springframework.integration.channel.DirectChannel",
    "message": "Channel \u0027decryptor-api.outboundChannel\u0027 has 1 subscriber(s).",
}
{
    "logger": "org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory",
    "message": "started org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory@619b7436, host\u003d10.1.4.4, port\u003d9021",
}
{
    "logger": "org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory",
    "message": "started org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory@71c1ca1, host\u003d10.1.4.4, port\u003d9021",
}
{
    "logger": "org.springframework.integration.ip.tcp.connection.FailoverClientConnectionFactory",
    "message": "started org.springframework.integration.ip.tcp.connection.FailoverClientConnectionFactory@20cf3ab3, host\u003d, port\u003d0",
}
{
    "logger": "org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory",
    "message": "started bean \u0027clientConnectionFactory\u0027; defined in: \u0027class path resource [com/globalpay/enterprise/integrations/decrypter/configuration/TcpClientConfig.class]\u0027; from source: \u0027com.globalpay.enterprise.integrations.decrypter.configuration.TcpClientConfig.clientConnectionFactory()\u0027, host\u003d, port\u003d0",
}
{
    "logger": "org.springframework.integration.endpoint.EventDrivenConsumer",
    "message": "started bean \u0027tcpClientConfig.outboundGateway.serviceActivator\u0027",
}
{
    "logger": "org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway",
    "message": "started bean \u0027tcpClientGateway#send(String)\u0027",
}
{
    "logger": "org.springframework.integration.gateway.GatewayProxyFactoryBean",
    "message": "started bean \u0027tcpClientGateway\u0027",
}
{
    "logger": "org.apache.coyote.http11.Http11NioProtocol",
    "message": "Starting ProtocolHandler [\"http-nio-8080\"]",
}
Zahid Khan
  • 2,130
  • 2
  • 18
  • 31

1 Answers1

0

See a singleUse option of the connection factory:

/**
 * If true, sockets created by this factory will be used once.
 * @param singleUse The singleUse to set.
 */
public void setSingleUse(boolean singleUse) {

And then see JavaDocs of the TcpOutboundGateway:

 * TCP outbound gateway that uses a client connection factory. If the factory is configured
 * for single-use connections, each request is sent on a new connection; if the factory does not use
 * single use connections, each request is blocked until the previous response is received
 * (or times out). Asynchronous requests/responses over the same connection are not
 * supported - use a pair of outbound/inbound adapters for that use case.

I'm not sure what is that TPS, but it would be great to know what your server side is doing to be sure that correlation of the reply to the request is going to happen properly.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • `TPS` here I refer to Transactions per second. – Zahid Khan Aug 05 '22 at 17:45
  • As I am using the `CachingClientFactory` and by default the `singleUse` is true. Docs of `CachingClientConnectionFactory` says ``` Connection factory that caches connections from the underlying target factory. The underlying factory will be reconfigured to have singleUse=true in order for the connection to be returned to the cache after use. Users should not subsequently set the underlying property to false, or cache starvation will result. ``` – Zahid Khan Aug 05 '22 at 17:48
  • That's a reason why I have not explicitly set `singleUse` to false – Zahid Khan Aug 05 '22 at 17:49
  • For each request, the server is replying on the same socket. Before using spring integration, I was doing socket programming I was using java.io in which I was blocking the thread till I was receiving the response from the server. I wasn't facing any issues. – Zahid Khan Aug 05 '22 at 18:04
  • Basically, the request contains the encrypted data and the server is decrypting that. The server is shared across multiple organizations. – Zahid Khan Aug 05 '22 at 18:06
  • Turn on a DEBUG logging level for the `org.springframework.integration.ip.tcp` category to observe what is going on. Sounds like the same `connnectionId` is reused, so the wrong reply is received. Not sure about the the number of TPS. Maybe `connectionProperty.getConnectionPoolSize()`. However it looks like we cannot do parallel async requests with NIO anyway: `Async replies are not supported with NIO; see the reference manual` – Artem Bilan Aug 05 '22 at 18:08
  • Hi Artem, thanks again for so much community support. 1. Do you want me to turn debug level ON during the load testing? 2. `Sounds like the same connnectionId is reused, so the wrong reply is received.` Is it possible as UUID is almost globally universal? 3. ` However it looks like we cannot do parallel async requests with NIO anyway: ` Can you please provide some reference I am unable to find resources. – Zahid Khan Aug 06 '22 at 19:17
  • 1. Added startup logs, 2. While debugging as you pointed out I found `connectionId` is getting reused however the `UUID` is different for each message. – Zahid Khan Aug 08 '22 at 17:36
  • What version are you using? Only 5.5.x is currently supported for OSS (https://spring.io/projects/spring-integration#support) - if you are using an older version, please see if you still have problems with the current version (5.5.14). – Gary Russell Aug 08 '22 at 18:08
  • Another point is the connection factories should be `@Bean`s. Otherwise they won'y be initialized properly (event publisher etc). They can be prototype beans using `getBean()` with different parameters. – Gary Russell Aug 08 '22 at 18:22
  • The `connectionId` can be reused, indeed, but that's cannot happen in parallel. Since you use a `CachingClientConnectionFactory`, then the item is taken from the cache exclusively and can be reused only when it is released back to the cache. – Artem Bilan Aug 08 '22 at 18:23