0

Context

We are using a spring integration flow to poll for new files in the input directory. The file name is passed to an existing spring batch job that fetches the file and handles the business processing.

    return IntegrationFlows.from(
            Files.inboundAdapter(new File(properties.getInputDir())).filter(new AcceptOnceFileListFilter<>()),
            c -> c.poller(Pollers.fixedRate(300, TimeUnit.SECONDS).maxMessagesPerPoll(50)))
            .log(LoggingHandler.Level.INFO, "Inb_GW_Msg", Message::getPayload)
            .channel(c -> c.executor(jobsTaskExecutor))
            .transform(fileMessageToJobRequest)
            .handle(jobLaunchingGateway, e -> e.advice(jobExecutionAdvice()))
            .log(LoggingHandler.Level.INFO, "Inb_GW_Msg_Processing_Result")
            .get();

the transformer sets the input file path as a job parameter

@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
    jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath());
    return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}

We are trying to modify the flow to use a remote SFTP server as the file source. The spring batch job uses an outbound spring integration gateway to GET the remote file using the path based on the recommendations provided here Spring Integration - Concurrent access to SFTP outbound gateway GET w/ STREAM and accessing the response from Queue Channel and it works well.

Issue

While trying to modify the inbound poller to fetch file names from the remote sftp server, we are having issues trying to find the appropriate inbound adapter.

The use of Sftp.inboundAdapter(SessionFactory<ChannelSftp.LsEntry> sessionFactory) sets up a SftpInboundFileSynchronizingMessageSource that would synchronize a remote file system to the local directory. This behavior is not desirable for our needs and we are looking for a remote poller that would merely fetch the file name similar to the local file system poller.

I understand that the SFTP outbound gateway supports the `ls' operation and one way to solve the polling needs is to implement a custom message source/poller that connects to the outbound gateway periodically to fetch the directory listing.

Are there any out-of-the-box adapters that support this requirement?

Seeking recommendations on simpler ways to achieve our polling needs without writing a lot of custom code and/or modifying the existing spring batch job.

Update

Looks like SftpStreamingMessageSource set up by Sftp.inboundStreamingAdapter would return the remote file metadata along with the input stream. I can simply discard/close the stream right away and use the metadata to kick off the spring batch job.

I will try this out and post an update. Appreciate feedback on this design idea.

1 Answers1

1

Use the outbound gateway with the list (LS) command.

https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#using-the-ls-command

EDIT

@Bean
IntegrationFlow flow(DefaultSftpSessionFactory sf) {
    return IntegrationFlows.fromSupplier(() -> "dir", e -> e.poller(Pollers.fixedDelay(5000)))
            .handle(Sftp.outboundGateway(sf, Command.LS, "payload")
                    .options(Option.NAME_ONLY))
            .split()
            .log()
            .get();
}
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Gary - could you provided high level ideas on how to use the poller in the inbound flow setup to point to the outbound gateway? Perhaps the poller sends the same message (the directory name) to the outbound gateway request channel? If so, what would be the message source for the beginning of the inbound integration flow? – tardistraveller Aug 18 '22 at 15:37
  • 1
    I added an example. Note that filters are not applied when listing the remote dir so you would need to filter out those files you already processed (unless they are removed). Your idea of using the stream source and closing the `InputStream` without consuming it would work too. – Gary Russell Aug 18 '22 at 15:59
  • Thanks for the example, Gary. If I were to use the stream source + close the input stream right away before starting the batch job, will the session go back to the CachedSessionFactory pool for reuse? The sftp streaming source is looping through the directory listing and producing a message with input stream + headers with metadata. As long as closing the input stream right away makes the session eligible for re-use, it shouldn't lead to too many active open sessions. – tardistraveller Aug 18 '22 at 16:33
  • 1
    No; you will have to close the session as well as the stream as discussed in the documentation. https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#sftp-streaming If you are using the caching CF, closing it will not physically close it, but return it to the pool. The session is in the `IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE` message header. `message.getHeaders(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, Closeable.class).close();`. – Gary Russell Aug 18 '22 at 18:58
  • That's Caching SF. – Gary Russell Aug 18 '22 at 19:04