0

I am trying to monitor a bucket folder by using IntegrationFlow. But after a certain time I get a timeout connecting to the connection pool.

My code looks like this:

@Configuration
@EnableIntegration
@IntegrationComponentScan
@EnableAsync
public class AmazonS3IntegrationConfig {

    private final S3Config amazonS3Config;

    public AmazonS3IntegrationConfig(S3Config amazonS3Config) {
        this.amazonS3Config = amazonS3Config;
    }

    @Bean
    public IntegrationFlow processFileFlow() {
        return IntegrationFlows.from(s3InboundStreamingMessageSource(), e -> e.poller(p -> p.fixedDelay(5, TimeUnit.SECONDS)))
                .handle("myHandlerClass", "handle")
                .get();
    }

    @Bean
    public S3RemoteFileTemplate template() {
        return new S3RemoteFileTemplate(new CustomS3SessionFactory(amazonS3Config));
    }

    @Bean
    public MessageSource<InputStream> s3InboundStreamingMessageSource() {
        S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
        messageSource.setRemoteDirectory(amazonS3Config.getOriginEncriptedBucket());
        return messageSource;
    }

}
public class CustomS3SessionFactory extends S3SessionFactory {

    private CustomS3Session s3Session;

    @Override
    public S3Session getSession() {
        return s3Session;
    }

    public CustomS3SessionFactory(S3Config amazonS3) {
        super(amazonS3.getS3client());
        Assert.notNull(amazonS3, "'amazonS3' must not be null.");
        this.s3Session = new CustomS3Session(amazonS3);
    }
}
public class CustomS3Session extends S3Session {

    private final String customEndpoint;

    public CustomS3Session(S3Config amazonS3) {
        super(amazonS3.getS3client());
        this.customEndpoint = amazonS3.getEndpoint();
    }

    @Override
    public String getHostPort() {
        return String.format("%s:%s", customEndpoint, "443");
    }
}

It works when the container starts. But after a certain time, it starts getting an exception:

 ERROR  o.s.core.log.LogAccessor org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:448)
    at org.springframework.integration.file.remote.RemoteFileTemplate.list(RemoteFileTemplate.java:409)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.listFiles(AbstractRemoteFileStreamingMessageSource.java:265)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.poll(AbstractRemoteFileStreamingMessageSource.java:242)
    at org.springframework.integration.aws.inbound.S3StreamingMessageSource.poll(S3StreamingMessageSource.java:67)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:194)
    at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:444)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:832)

When I increased the maximum connections that my s3client would allow the error got postponed, but it still happened. I'm thinking that the connections aren't getting closed by the IntegrationFlow process. Is there a way that I can assert that those connections get closed after processing is done?

Ermiya Eskandary
  • 15,323
  • 3
  • 31
  • 44

0 Answers0