I am trying to monitor a bucket folder by using IntegrationFlow. But after a certain time I get a timeout connecting to the connection pool.
My code looks like this:
@Configuration
@EnableIntegration
@IntegrationComponentScan
@EnableAsync
public class AmazonS3IntegrationConfig {
private final S3Config amazonS3Config;
public AmazonS3IntegrationConfig(S3Config amazonS3Config) {
this.amazonS3Config = amazonS3Config;
}
@Bean
public IntegrationFlow processFileFlow() {
return IntegrationFlows.from(s3InboundStreamingMessageSource(), e -> e.poller(p -> p.fixedDelay(5, TimeUnit.SECONDS)))
.handle("myHandlerClass", "handle")
.get();
}
@Bean
public S3RemoteFileTemplate template() {
return new S3RemoteFileTemplate(new CustomS3SessionFactory(amazonS3Config));
}
@Bean
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
messageSource.setRemoteDirectory(amazonS3Config.getOriginEncriptedBucket());
return messageSource;
}
}
public class CustomS3SessionFactory extends S3SessionFactory {
private CustomS3Session s3Session;
@Override
public S3Session getSession() {
return s3Session;
}
public CustomS3SessionFactory(S3Config amazonS3) {
super(amazonS3.getS3client());
Assert.notNull(amazonS3, "'amazonS3' must not be null.");
this.s3Session = new CustomS3Session(amazonS3);
}
}
public class CustomS3Session extends S3Session {
private final String customEndpoint;
public CustomS3Session(S3Config amazonS3) {
super(amazonS3.getS3client());
this.customEndpoint = amazonS3.getEndpoint();
}
@Override
public String getHostPort() {
return String.format("%s:%s", customEndpoint, "443");
}
}
It works when the container starts. But after a certain time, it starts getting an exception:
ERROR o.s.core.log.LogAccessor org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:448)
at org.springframework.integration.file.remote.RemoteFileTemplate.list(RemoteFileTemplate.java:409)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.listFiles(AbstractRemoteFileStreamingMessageSource.java:265)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.poll(AbstractRemoteFileStreamingMessageSource.java:242)
at org.springframework.integration.aws.inbound.S3StreamingMessageSource.poll(S3StreamingMessageSource.java:67)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:194)
at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:444)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
When I increased the maximum connections that my s3client would allow the error got postponed, but it still happened. I'm thinking that the connections aren't getting closed by the IntegrationFlow process. Is there a way that I can assert that those connections get closed after processing is done?