I'm using Spring Batch Integration for polling and process a SFTP server, we receive thousands of XMLs files (between 50MB ~ 200MB each file).
Actually I'm running 6 instances of my app to process those files.
Spring: 2.3.9.RELEASE
Batch-Integration: 4.2.5.RELEASE
Integration-SFTP: 5.5.5
My SessionFactory is running as this:
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost(integrationProperties.getSftpSourceHost());
factory.setPort(integrationProperties.getSftpSourcePort());
factory.setUser(integrationProperties.getSftpSourceUser());
factory.setPrivateKey(new FileSystemResource(integrationProperties.getSftpSourcePrivateKeyLocation()));
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory, 10);
}
My file filter:
private FileListFilter<ChannelSftp.LsEntry> sftpFileListFilter() {
return new CompositeFileListFilter<ChannelSftp.LsEntry>()
.addFilter(new SftpPersistentAcceptOnceFileListFilter(new RedisMetadataStore(redisConnectionFactory),
"prefix-"))
.addFilter(new SftpRegexPatternFileListFilter("^HELLO.*\\.xml$"));
}
The integration flow:
@Bean
public IntegrationFlow myIntegrationFlow(
SftpInboundFileSynchronizingMessageSource sftpInboundFileSynchronizingMessageSource) {
return IntegrationFlows
.from(sftpInboundFileSynchronizingMessageSource,
c -> c.poller(Pollers.fixedDelay(integrationProperties.getPollerDelay())
.maxMessagesPerPoll(integrationProperties.getMaxMessagePerPoll())))
.transform(myFileMessageToJobRequest()).handle(myJobLaunchingGateway()).get();
}
The config is:
poller-delay: 10000
max-message-per-poll: 5
max-fetch-size: 3
The expected behavior is that 6 pods running, at least always 6 files are being processed at the same time, but I'm notice something different from this instead.
First situation is when SFTP connection is closed by the server-side and unfortunately I don't have access to check the config.
Frequently I'm received this log:
2022-07-08 08:04:18.763 INFO [my-app,,,] 1 --- [ scheduling-1] com.jcraft.jsch : Disconnecting from sftp-server port XXXXX
2022-07-08 08:04:18.778 INFO [my-app,,,] 1 --- [xchange session] com.jcraft.jsch : Caught an exception, leaving main loop due to Socket closed
After that the app stuck and start to reconnecting:
2022-07-08 08:04:38.827 INFO [my-app,,,] 1 --- [ scheduling-1] com.jcraft.jsch : Connecting to sftp-server port XXXXX
Then it connects again:
2022-07-08 08:04:44.301 INFO [my-app,,,] 1 --- [ scheduling-1] com.jcraft.jsch : Authentication succeeded (publickey).
But the problem is that after the reconnection the SFTP is full of files, but this instance get the next file after 10 minutes (08:14):
2022-07-08 08:14:00.916 INFO [my-app,4a37a5aa19e0af32,865cdab263682593,true] 1 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=myJob]] launched with the following parameters: [{input.file.name=/local-integration/file/HELLO_20220708_99999.xml}]
So the instances are being idle for 10 minutes or more after disconnecting, I'm trying to discover if is some limitation on the server side or if I could to increase the performance in my side.
The second situation is the idle time randomly:
The SFTP has a lot of files waiting to process, but inspecting one instance we had this kind of case:
2022-07-08 09:20:32.221 INFO [my-app,2717c6cf1ea7fc62,40c1823daf0e780c,true] 1 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=myJob]] completed with the following parameters: [{input.file.name=/local-integration/file/HELLO_20220708_99023.xml}] and the following status: [COMPLETED] in 788ms
So the last job was completed at 9:20, but the instance is being idle and the next job starts only at 9:48 in this instance:
2022-07-08 09:48:56.411 INFO [my-app,3ae108fc77f1347e,de6787a73f1eba0a,true] 1 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=myJob]] launched with the following parameters: [{input.file.name=/local-integration/file/HELLO_20220708_99123.xml}]
This behavior is occurring in all instances and if I check the SFTP there are a lot of files waiting to being process.
Another information is that I'm monitoring the memory and cpu of the instances and it's all fine.
Is there anything wrong in my setup or something that I need to check only at the server side?
EDIT:
My InboundFileSynchronizer:
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer(SessionFactory sftpSessionFactory) {
SftpInboundFileSynchronizer synchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory);
synchronizer.setPreserveTimestamp(true);
synchronizer.setDeleteRemoteFiles(true);
synchronizer.setFilter(sftpFileListFilter());
synchronizer.setRemoteDirectory(integrationProperties.getSftpSourceDirectory());
Expression expression = PARSER.parseExpression("#this.substring(#this.lastIndexOf('/')+1)");
synchronizer.setLocalFilenameGeneratorExpression(expression);
return synchronizer;
}
And my IntegrationFlow:
@Bean
public IntegrationFlow myIntegrationFlow(
SftpInboundFileSynchronizingMessageSource sftpInboundFileSynchronizingMessageSource) {
return IntegrationFlows
.from(sftpInboundFileSynchronizingMessageSource,
c -> c.poller(Pollers.fixedDelay(integrationProperties.getPollerDelay())
.maxMessagesPerPoll(integrationProperties.getMaxMessagePerPoll())))
.transform(fileMessageToJobRequest()).handle(jobLaunchingGateway()).get();
}
EDIT 2:
According to Artem comments I enabled the logging level DEBUG for spring-integration, I put a lot of files in the SFTP and I noticed the following behavior:
POD 1 - Was polling and suddenly stopped:
2022-07-11 17:59:06.385 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.i.s.i.SftpInboundFileSynchronizer : 0 files transferred from 'download'
2022-07-11 17:59:06.385 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2022-07-11 17:59:16.385 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.integration.util.SimplePool : Obtained org.springframework.integration.sftp.session.SftpSession@7b33a9bd from pool.
2022-07-11 17:59:16.962 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.i.f.r.session.CachingSessionFactory : Releasing Session org.springframework.integration.sftp.session.SftpSession@7b33a9bd back to the pool.
2022-07-11 17:59:16.963 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.integration.util.SimplePool : Releasing org.springframework.integration.sftp.session.SftpSession@7b33a9bd back to the pool
2022-07-11 17:59:16.963 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.i.s.i.SftpInboundFileSynchronizer : 0 files transferred from 'download'
2022-07-11 17:59:16.963 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2022-07-11 17:59:26.963 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.integration.util.SimplePool : Obtained org.springframework.integration.sftp.session.SftpSession@7b33a9bd from pool.
POD 2 - Was polling and then stopped too:
2022-07-11 17:59:24.311 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.i.s.i.SftpInboundFileSynchronizer : 0 files transferred from 'download'
2022-07-11 17:59:24.311 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
2022-07-11 17:59:34.311 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.integration.util.SimplePool : Obtained org.springframework.integration.sftp.session.SftpSession@71b3d872 from pool.
Then both back logging the following:
POD 2 at 18:08:
2022-07-11 18:08:02.920 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.i.s.i.SftpInboundFileSynchronizer : deleted remote file: download/HELLO_20220711_1756.xml
POD 1 at 18:09:
2022-07-11 18:09:54.078 INFO [my-app,,,] 1 --- [ scheduling-1] o.s.i.s.i.SftpInboundFileSynchronizer : Removing the remote file '-rw-r--r-- 1 1009 1009 98304 Jul 11 20:59 HELLO_20220711_1756.xml' from the filter for a subsequent transfer attempt
After that the pod 1:
2022-07-11 18:09:54.081 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.i.f.r.session.CachingSessionFactory : Releasing Session org.springframework.integration.sftp.session.SftpSession@7b33a9bd back to the pool.
2022-07-11 18:09:54.082 INFO [my-app,,,] 1 --- [ scheduling-1] com.jcraft.jsch : Disconnecting from sftp-server port 22444
2022-07-11 18:09:54.091 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.integration.util.SimplePool : Releasing org.springframework.integration.sftp.session.SftpSession@7b33a9bd back to the pool
2022-07-11 18:09:54.092 INFO [my-app,,,] 1 --- [xchange session] com.jcraft.jsch : Caught an exception, leaving main loop due to Socket closed
2022-07-11 18:09:54.092 DEBUG [my-app,,,] 1 --- [ scheduling-1] o.s.i.channel.PublishSubscribeChannel : preSend on channel 'bean 'errorChannel'', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'download' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to remove file.; nested exception is 2: No such file, headers={id=4993d0cf-2024-16f8-214a-3eb4995a1c86, timestamp=1657573794092}]
2022-07-11 18:09:54.093 DEBUG [my-app,5a987dd08eba202f,5a987dd08eba202f,false] 1 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger' received message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'download' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to remove file.; nested exception is 2: No such file, headers={b3=5a987dd08eba202f-5a987dd08eba202f-0, id=99180fa4-d8ae-4275-d232-b036daa06b62, timestamp=1657573794093}]
2022-07-11 18:09:54.094 ERROR [my-app,5a987dd08eba202f,5a987dd08eba202f,false] 1 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'download' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to remove file.; nested exception is 2: No such file
According to the logs both copied the file from remote, both tried to delete and both tried to start the job, but spring batch prevents from that.
EDIT:
My file filter is like this:
private FileListFilter<ChannelSftp.LsEntry> sftpFileListFilter() {
return new ChainFileListFilter<ChannelSftp.LsEntry>()
.addFilter(new SftpRegexPatternFileListFilter("^HELLO.*\\.xml$"))
.addFilter(new SftpPersistentAcceptOnceFileListFilter(new RedisMetadataStore(redisConnectionFactory),
"prefix-"));
}