5

I'm using

  • Sprint Integration(File, SFTP, etc) 4.3.6
  • Spring Boot 1.4.3
  • Spring Integration Java DSL 1.1.4

and I'm trying to set up an SFTP outbound adapter that would allow me move a file to a directory on the remote system and also remove or rename the file in my local system.

So, for instance I would like to place a file, a.txt, in a local directory and have it SFTP'ed to a remote server in the directory inbound. Once the transfer is complete I would like to have the local copy of a.txt deleted, or renamed.

I was toying with a couple of ways to this. So here is my common SessionFactory for the test.

protected SessionFactory<ChannelSftp.LsEntry> buildSftpSessionFactory() {
    DefaultSftpSessionFactory sessionFactory = new DefaultSftpSessionFactory();
    sessionFactory.setHost("localhost");
    sessionFactory.setUser("user");
    sessionFactory.setAllowUnknownKeys(true);
    sessionFactory.setPassword("pass");
    CachingSessionFactory<ChannelSftp.LsEntry> cachingSessionFactory = new CachingSessionFactory<>(sessionFactory, 1);
    return cachingSessionFactory;
}

This is a transformer that I have to add some of the headers to the message

@Override
public Message<File> transform(Message<File> source) {
    System.out.println("here is the thing : "+source);
    File file = (File)source.getPayload();
    Message<File> transformedMessage = MessageBuilder.withPayload(file)
            .copyHeaders(source.getHeaders())
            .setHeaderIfAbsent(FileHeaders.ORIGINAL_FILE, file)
            .setHeaderIfAbsent(FileHeaders.FILENAME, file.getName())
            .build();
    return transformedMessage;
}

I then have an integration flow that uses a poller to watch a local directory and invoke this :

@Bean
public IntegrationFlow pushTheFile(){
    return IntegrationFlows
            .from(s -> s.file(new File(DIR_TO_WATCH))
                            .patternFilter("*.txt").preventDuplicates(),
                    e -> e.poller(Pollers.fixedDelay(100)))
            .transform(outboundTransformer)
            .handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
                    .remoteFileSeparator("/")
                    .useTemporaryFileName(false)
                    .remoteDirectory("inbound/")
            )
            .get();
}

This works fine, but leaves the local file. Any ideas on how to have this remove local file after the upload completes? Should I be looking at the SftpOutboundGateway instead?

Thanks in advance!

Artem's answer worked perfectly! Here is a quick example that deletes the local file after it is pushed.

@Bean
public IntegrationFlow pushTheFile(){
    return IntegrationFlows
            .from(s -> s.file(new File(DIR_TO_WATCH))
                            .patternFilter("*.txt").preventDuplicates(),
                    e -> e.poller(Pollers.fixedDelay(100)))
            .transform(outboundTransformer)
            .handle(Sftp.outboundAdapter(this.buildSftpSessionFactory())
                    .remoteFileSeparator("/")
                    .useTemporaryFileName(false)
                    .remoteDirectory("inbound/"), c -> c.advice(expressionAdvice(c))
            )
            .get();
}

@Bean
public Advice expressionAdvice(GenericEndpointSpec<FileTransferringMessageHandler<ChannelSftp.LsEntry>> c) {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnSuccessExpression("payload.delete()");
    advice.setOnFailureExpression("payload + ' failed to upload'");
    advice.setTrapException(true);
    return advice;
}
Tristan
  • 279
  • 5
  • 21
  • May be this will help you- http://stackoverflow.com/questions/36247467/spring-sftp-inbound-chanel-adapter-delete-local-file?rq=1 – Amit Bhati Feb 02 '17 at 13:07

1 Answers1

1

For this purpose you can use several approaches.

All of them are based on the fact that you do something else with the original request message for the Sftp.outboundAdapter().

  1. .publishSubscribeChannel() allows you to send the same message to several subscribers, when the second will receive it only the first one finishes its work. By default, if you don't specify Executor

  2. routeToRecipients() allows you make the same result but via different component.

  3. ExpressionEvaluatingRequestHandlerAdvice - you add this one to the .advice() of the Sftp.outboundAdapter() endpoint definition - the second .handle() argument and perform file.delete() via onSuccessExpression:

    .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
    
Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Thank you Artem for the response. I'm trying to run with the 3rd option that you mentioned, but I'm a little confused. The `Sftp.outboundAdapter()` doesn't seem to have an `advice()` method. I'm sure I'm just missing something simple, but can you help point me in a direction? – Tristan Feb 03 '17 at 13:06
  • True, you have to take a look to the `Consumer` arg of the `.handle()`:https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference#endpointConfig – Artem Bilan Feb 03 '17 at 13:16
  • I see what you are saying now. I'll post an example once I have it cleaned up. Thanks again!! – Tristan Feb 03 '17 at 15:49
  • Adding an example in the post – Tristan Feb 03 '17 at 16:23
  • @ArtemBilan what if Sftp.outboundAdapter() failed to upload file. second subscribe still getting message for processing. is there a way to stop it? – Umair Aug 19 '20 at 10:08
  • 1
    Sorry, it sounds like it deserves its own SO thread with much more info. – Artem Bilan Aug 19 '20 at 12:17
  • @ArtemBilan https://stackoverflow.com/questions/63489690/spring-dsl-integration-flow-publishing-message-after-error-in-outbound-adaptor – Umair Aug 19 '20 at 14:53