0

I'm using Spring Batch Integration for polling and process a SFTP server, we receive thousands of XMLs files (between 50MB ~ 200MB each file).

Actually I'm running 6 instances of my app to process those files.

Spring: 2.3.9.RELEASE Batch-Integration: 4.2.5.RELEASE Integration-SFTP: 5.5.5

My SessionFactory is running as this:

@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
    factory.setHost(integrationProperties.getSftpSourceHost());
    factory.setPort(integrationProperties.getSftpSourcePort());
    factory.setUser(integrationProperties.getSftpSourceUser());
    factory.setPrivateKey(new FileSystemResource(integrationProperties.getSftpSourcePrivateKeyLocation()));
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<>(factory, 10);
}

My file filter:

private FileListFilter<ChannelSftp.LsEntry> sftpFileListFilter() {
    return new CompositeFileListFilter<ChannelSftp.LsEntry>()
            .addFilter(new SftpPersistentAcceptOnceFileListFilter(new RedisMetadataStore(redisConnectionFactory),
                    "prefix-"))
            .addFilter(new SftpRegexPatternFileListFilter("^HELLO.*\\.xml$"));
}

The integration flow:

@Bean
public IntegrationFlow myIntegrationFlow(
        SftpInboundFileSynchronizingMessageSource sftpInboundFileSynchronizingMessageSource) {
    return IntegrationFlows
            .from(sftpInboundFileSynchronizingMessageSource,
                    c -> c.poller(Pollers.fixedDelay(integrationProperties.getPollerDelay())
                            .maxMessagesPerPoll(integrationProperties.getMaxMessagePerPoll())))
            .transform(myFileMessageToJobRequest()).handle(myJobLaunchingGateway()).get();
}

The config is:

poller-delay: 10000
max-message-per-poll: 5
max-fetch-size: 3

The expected behavior is that 6 pods running, at least always 6 files are being processed at the same time, but I'm notice something different from this instead.

First situation is when SFTP connection is closed by the server-side and unfortunately I don't have access to check the config.

Frequently I'm received this log:

2022-07-08 08:04:18.763  INFO [my-app,,,] 1 --- [   scheduling-1] com.jcraft.jsch : Disconnecting from sftp-server port XXXXX
2022-07-08 08:04:18.778  INFO [my-app,,,] 1 --- [xchange session] com.jcraft.jsch : Caught an exception, leaving main loop due to Socket closed

After that the app stuck and start to reconnecting:

2022-07-08 08:04:38.827  INFO [my-app,,,] 1 --- [   scheduling-1] com.jcraft.jsch : Connecting to sftp-server port XXXXX

Then it connects again:

2022-07-08 08:04:44.301  INFO [my-app,,,] 1 --- [   scheduling-1] com.jcraft.jsch : Authentication succeeded (publickey).

But the problem is that after the reconnection the SFTP is full of files, but this instance get the next file after 10 minutes (08:14):

2022-07-08 08:14:00.916  INFO [my-app,4a37a5aa19e0af32,865cdab263682593,true] 1 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=myJob]] launched with the following parameters: [{input.file.name=/local-integration/file/HELLO_20220708_99999.xml}]

So the instances are being idle for 10 minutes or more after disconnecting, I'm trying to discover if is some limitation on the server side or if I could to increase the performance in my side.

The second situation is the idle time randomly:

The SFTP has a lot of files waiting to process, but inspecting one instance we had this kind of case:

2022-07-08 09:20:32.221  INFO [my-app,2717c6cf1ea7fc62,40c1823daf0e780c,true] 1 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=myJob]] completed with the following parameters: [{input.file.name=/local-integration/file/HELLO_20220708_99023.xml}] and the following status: [COMPLETED] in 788ms

So the last job was completed at 9:20, but the instance is being idle and the next job starts only at 9:48 in this instance:

2022-07-08 09:48:56.411  INFO [my-app,3ae108fc77f1347e,de6787a73f1eba0a,true] 1 --- [   scheduling-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=myJob]] launched with the following parameters: [{input.file.name=/local-integration/file/HELLO_20220708_99123.xml}]

This behavior is occurring in all instances and if I check the SFTP there are a lot of files waiting to being process.

Another information is that I'm monitoring the memory and cpu of the instances and it's all fine.

Is there anything wrong in my setup or something that I need to check only at the server side?

EDIT:

My InboundFileSynchronizer:

@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer(SessionFactory sftpSessionFactory) {
    SftpInboundFileSynchronizer synchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory);
    synchronizer.setPreserveTimestamp(true);
    synchronizer.setDeleteRemoteFiles(true);
    synchronizer.setFilter(sftpFileListFilter());
    synchronizer.setRemoteDirectory(integrationProperties.getSftpSourceDirectory());

    Expression expression = PARSER.parseExpression("#this.substring(#this.lastIndexOf('/')+1)");
    synchronizer.setLocalFilenameGeneratorExpression(expression);

    return synchronizer;
}

And my IntegrationFlow:

    @Bean
public IntegrationFlow myIntegrationFlow(
        SftpInboundFileSynchronizingMessageSource sftpInboundFileSynchronizingMessageSource) {
    return IntegrationFlows
            .from(sftpInboundFileSynchronizingMessageSource,
                    c -> c.poller(Pollers.fixedDelay(integrationProperties.getPollerDelay())
                            .maxMessagesPerPoll(integrationProperties.getMaxMessagePerPoll())))
            .transform(fileMessageToJobRequest()).handle(jobLaunchingGateway()).get();
}

EDIT 2:

According to Artem comments I enabled the logging level DEBUG for spring-integration, I put a lot of files in the SFTP and I noticed the following behavior:

POD 1 - Was polling and suddenly stopped:

2022-07-11 17:59:06.385 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.i.s.i.SftpInboundFileSynchronizer    : 0 files transferred from 'download'
2022-07-11 17:59:06.385 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'
2022-07-11 17:59:16.385 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.integration.util.SimplePool          : Obtained org.springframework.integration.sftp.session.SftpSession@7b33a9bd from pool.
2022-07-11 17:59:16.962 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.i.f.r.session.CachingSessionFactory  : Releasing Session org.springframework.integration.sftp.session.SftpSession@7b33a9bd back to the pool.
2022-07-11 17:59:16.963 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.integration.util.SimplePool          : Releasing org.springframework.integration.sftp.session.SftpSession@7b33a9bd back to the pool
2022-07-11 17:59:16.963 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.i.s.i.SftpInboundFileSynchronizer    : 0 files transferred from 'download'
2022-07-11 17:59:16.963 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'
2022-07-11 17:59:26.963 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.integration.util.SimplePool          : Obtained org.springframework.integration.sftp.session.SftpSession@7b33a9bd from pool.

POD 2 - Was polling and then stopped too:

2022-07-11 17:59:24.311 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.i.s.i.SftpInboundFileSynchronizer    : 0 files transferred from 'download'
2022-07-11 17:59:24.311 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'
2022-07-11 17:59:34.311 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.integration.util.SimplePool          : Obtained org.springframework.integration.sftp.session.SftpSession@71b3d872 from pool.

Then both back logging the following:

POD 2 at 18:08:

2022-07-11 18:08:02.920 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.i.s.i.SftpInboundFileSynchronizer    : deleted remote file: download/HELLO_20220711_1756.xml

POD 1 at 18:09:

2022-07-11 18:09:54.078  INFO [my-app,,,] 1 --- [   scheduling-1] o.s.i.s.i.SftpInboundFileSynchronizer    : Removing the remote file '-rw-r--r--    1 1009     1009        98304 Jul 11 20:59 HELLO_20220711_1756.xml' from the filter for a subsequent transfer attempt

After that the pod 1:

2022-07-11 18:09:54.081 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.i.f.r.session.CachingSessionFactory  : Releasing Session org.springframework.integration.sftp.session.SftpSession@7b33a9bd back to the pool.
2022-07-11 18:09:54.082  INFO [my-app,,,] 1 --- [   scheduling-1] com.jcraft.jsch                          : Disconnecting from sftp-server port 22444
2022-07-11 18:09:54.091 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.integration.util.SimplePool          : Releasing org.springframework.integration.sftp.session.SftpSession@7b33a9bd back to the pool
2022-07-11 18:09:54.092  INFO [my-app,,,] 1 --- [xchange session] com.jcraft.jsch                          : Caught an exception, leaving main loop due to Socket closed
2022-07-11 18:09:54.092 DEBUG [my-app,,,] 1 --- [   scheduling-1] o.s.i.channel.PublishSubscribeChannel    : preSend on channel 'bean 'errorChannel'', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'download' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to remove file.; nested exception is 2: No such file, headers={id=4993d0cf-2024-16f8-214a-3eb4995a1c86, timestamp=1657573794092}]
2022-07-11 18:09:54.093 DEBUG [my-app,5a987dd08eba202f,5a987dd08eba202f,false] 1 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger' received message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'download' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to remove file.; nested exception is 2: No such file, headers={b3=5a987dd08eba202f-5a987dd08eba202f-0, id=99180fa4-d8ae-4275-d232-b036daa06b62, timestamp=1657573794093}]
2022-07-11 18:09:54.094 ERROR [my-app,5a987dd08eba202f,5a987dd08eba202f,false] 1 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'download' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to remove file.; nested exception is 2: No such file

According to the logs both copied the file from remote, both tried to delete and both tried to start the job, but spring batch prevents from that.

EDIT:

My file filter is like this:

    private FileListFilter<ChannelSftp.LsEntry> sftpFileListFilter() {
        return new ChainFileListFilter<ChannelSftp.LsEntry>()
                .addFilter(new SftpRegexPatternFileListFilter("^HELLO.*\\.xml$"))
                .addFilter(new SftpPersistentAcceptOnceFileListFilter(new RedisMetadataStore(redisConnectionFactory),
                        "prefix-"));
    }
Guilherme Bernardi
  • 490
  • 1
  • 6
  • 18

1 Answers1

1

thousands of XMLs files

Probably such a delay is relevant to the metadata loaded for each file from Redis. So, the SftpPersistentAcceptOnceFileListFilter slows down a production from the channel adapter just because it has to skip so many files.

I would suggest to change you composite filter to the ChainFileListFilter which does not pass entry to the next filter if the current fail. See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-reading. And move that SftpRegexPatternFileListFilter as the first one. So, you won't call Redis if the file doesn't match the pattern

I also would consider some custom filter which would not pass the file which is old enough if you cannot remove already processed files from that remote dir. This way you won't call Redis again.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Artem, I changed to the ChainFileListFilter and what you say makes sense, but it doesn't solve the problem, the RegexFilter prevents only from a few files 2 or at max 4, so every file hits the PersistentFilter. I also added my synchronizer and the integrationflow and I'm already deleting the remote file with `synchronizer.setDeleteRemoteFiles(true);` – Guilherme Bernardi Jul 11 '22 at 18:36
  • Can you, please, turn on a `DEBUG` for the `org.springframework.integration` category to observe in logs what is going on during those `10 minutes` ? – Artem Bilan Jul 11 '22 at 19:04
  • Artem, I enabled logs and updated the answer, I noticed that sometimes both pods are trying to process the same file during the synchronizer. – Guilherme Bernardi Jul 11 '22 at 21:46
  • You probably have all those files in local dirs... – Artem Bilan Jul 11 '22 at 23:44
  • Sorry, but I didn't understand. I'm trying to understand if something in my flow is wrong, my integration is doing the following: poll the SFTP, download file to local dir, start the job, process and delete the local file and go to the next file. What I noticed with the logs is that 2 instances copy the same file to local dir but one deleted the remote file and the other stuck trying to do the same... – Guilherme Bernardi Jul 12 '22 at 11:54
  • That’s why I said to do a `ChainFileListFilter`. – Artem Bilan Jul 12 '22 at 12:27
  • I did... it's already running with `ChainFileListFilter`. – Guilherme Bernardi Jul 12 '22 at 13:19
  • Then with shared Redis it is impossible to pull the same file from different apps . Unless that file is modified in between… – Artem Bilan Jul 12 '22 at 13:21
  • I guess is during the transform, I saw some samples where the guys are implementing an idempotent receiver, I reading to check if it makes senses in my case. – Guilherme Bernardi Jul 12 '22 at 13:29
  • I edit again the answer and added the filter that I'm using with the `ChainFileListFilter`. – Guilherme Bernardi Jul 12 '22 at 13:34
  • `removed from filter` - looks like you got some error downstream , so the failed file would be re-fetched. And of course that may happen on the other node – Artem Bilan Jul 12 '22 at 13:35
  • Artem, so probably my problem is on the sftp server and not in my app? The log which I posted should not cause any idle behavior? I really trying to discover if the problem is here or at the server, as I said I don't have access to this SFTP config, so I'll try to test the same scenario in our own SFTP. – Guilherme Bernardi Jul 15 '22 at 15:03