1

I've a spring boot webflux application which connects to 5 different downstream services to get data using webclient, everything works well until I try to read a file from S3 periodically (using Spring scheduler) using aws-s3-async sdk, during the file load somehow it's impacting the downstream service calls and adding latency to it, since all these downstream calls are parallel so impact on one has an ripple effect on overall latency of service.

My web client settings looks like below

ConnectionProvider connectionProvider = ConnectionProvider
                .builder("webclient-conn-pool")
                .maxConnections(1000)
                .maxIdleTime(Duration.ofMillis(75000))
                .maxLifeTime(Duration.ofMillis(150000))
                .metrics(true)
                .pendingAcquireMaxCount(httpConnPoolProperties.getPendingAcquireMaxCount())
                //.evictInBackground(Duration.ofMillis(150000))
                .lifo()
                .build();
        HttpClient httpClient = HttpClient.create(connectionProvider)
                .metrics(true, Function.identity())
                .tcpConfiguration(tcpClient -> {
                    tcpClient.option(ChannelOption.TCP_NODELAY, true);
                    tcpClient.option(ChannelOption.SO_KEEPALIVE, true);
                    tcpClient.option(ChannelOption.ALLOW_HALF_CLOSURE, false);
                    return tcpClient;
                }).compress(true);

        final ExchangeStrategies strategies = ExchangeStrategies.builder()
            .codecs(codecs ->
                codecs.defaultCodecs().maxInMemorySize(
                    httpConnPoolProperties.getCodecMaxBytesToBuffer()))
            .build();

        return webClientBuilder
            .exchangeStrategies(strategies)
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .build();

AWS s3 async client config

SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
            .connectionMaxIdleTime(Duration.ofMillis(75000))
            .tcpKeepAlive(true)
            .build();
        S3Configuration serviceConfiguration = S3Configuration.builder()
            .checksumValidationEnabled(false)
            .chunkedEncodingEnabled(false)
            .build();
        S3AsyncClientBuilder b = S3AsyncClient.builder()
            .httpClient(httpClient)
            .region(Region.of(cuePointBucketS3Region))
            .serviceConfiguration(serviceConfiguration);

        return b.build();

Scheduler thread where I'm trying to read file from S3

downStreamService.get(tenantId)
                .publishOn(Schedulers.boundedElastic())
                .map(res -> {
                    return  parseResponse(res);
                })
                .map(data -> {
                    return cuePointFileLoader.loadAsync(data.getData());
                }).publishOn(Schedulers.boundedElastic())
                .flatMap(Mono::fromFuture)
                .map(BytesWrapper::asByteArray)
                .publishOn(Schedulers.boundedElastic())
                .subscribe(d -> {
                    try {
                        
                        CuePoints cuePoints = CuePoints.parseFrom(d);
                        CUE_POINT_ATOMIC_REF.lazySet(cuePoints);
                         metricClient.timer(Metric.CUE_POINT_S3_LOAD_TIME,
                            stopWatch.getTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
                    
                    } catch (Exception ex) {
                        return CuePoints.newBuilder().build();
                    } finally {
                        stopWatch.stop();
                    }
                });
Akhil
  • 1,184
  • 1
  • 18
  • 42

0 Answers0