4

I have a SFTP directory and reading files and sending the files for further processing to a ServiceActivator.At any point I need to process them parallely using the handler.

Here is my SPring Integration java DSL flow.

IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory())
                        .temporaryFileSuffix("COPY")
                        .localDirectory(directory)
                        .deleteRemoteFiles(false)
                        .preserveTimestamp(true)
                        .remoteDirectory("remoteDir"))
                        .patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5)))
                        .handle("mybean", "myMethod")
                        .handle(Files.outboundAdapter(new File("success")))         
                        .deleteSourceFiles(true)
                        .autoCreateDirectory(true))
                        .get();

Update:Here is my ThreadPoolExecutor:

@Bean(name = "executor")
public Executor getExecutor()
{
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(4);
    executor.setQueueCapacity(20);          
    executor.initialize();
    return executor;
}
Harish
  • 3,343
  • 15
  • 54
  • 75

2 Answers2

2

The Sftp.inboundAdapter() (SftpInboundFileSynchronizingMessageSource) returns remote files one by one anyway. First of all it synchronizes them to the local directory and only after that poll them for message processing as File payload.

To process them in parallel that would be just enough to add a taskExecutor to your e.poller() definition and all those maxMessagesPerPoll(5) will be distributed to different threads.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • I did try that option I got an exception . Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.integration.util.ErrorHandlingTaskExecutor$1@41c6ed7a rejected from java.util.concurrent.ThreadPoolExecutor@53ee0cd5[Running, pool size = 4, active threads = 4, queued tasks = 20, completed tasks = 1] Also the execution is very slow.Slower than single thread. – Harish Mar 11 '16 at 23:43
  • That's not a problem of `TaskExecutor`. Your handler is somehow slow, maybe blocked somewhere. From other side you always can use `CallerRunsPolicy` instead of the default `AbortPolicy`. – Artem Bilan Mar 11 '16 at 23:56
  • Where do we add CallerRunsPolicy – Harish Mar 12 '16 at 00:50
  • To the `ThreadPoolTaskExecutor.setRejectedExecutionHandler()` – Artem Bilan Mar 12 '16 at 00:56
  • I tried with CallerRuns but still it's slow. I have doubt over my method. I will check that also will these lines block the thread? handle(Files.outboundAdapter(new File("success"))) .deleteSourceFiles(true) – Harish Mar 12 '16 at 13:04
  • Didn't want to raise a new question but if needed will do it.Wanted to know how to move the file in the SFTP server from one folder to another folder? – Harish Mar 14 '16 at 16:50
  • Consider to use `MV` command of the ``. – Artem Bilan Mar 14 '16 at 16:52
  • .handle(Sftp.outboundGateway(getSftpSessionFactory(),Command.MV,?)) what is the last String for? – Harish Mar 14 '16 at 17:15
  • A SpEL expression for the `source` file path. See Reference Manual for more info: http://docs.spring.io/spring-integration/reference/html/sftp.html#sftp-outbound-gateway. The Java DSL is fully based on those options. Don't forget the `renameExpression` option. – Artem Bilan Mar 14 '16 at 17:22
  • http://stackoverflow.com/questions/31156737/moving-processed-files-in-remote-sftp-using-java-dsl for those of you who will read this thread in future,this link question has the answer. – Harish Mar 14 '16 at 20:51
  • I ended up using this solution and highly recommend anyone to use this solution.Its too good http://stackoverflow.com/questions/31819189/move-file-after-successful-ftp-transfer-using-java-dsl – Harish Mar 15 '16 at 21:57
0

For my use case, assigning the taskExecutor to the channel did the trick. Here is the code I used for a really simple application that polls a folder and processes the files. Files are handled in parallel, even if I assign only one thread to the poller.

@Slf4j
@SpringBootApplication
public class SpringIntegrationDemoApplication {

  public static void main( String[] args ) {
    SpringApplication.run( SpringIntegrationDemoApplication.class, args );
  }

  @Bean
  public IntegrationFlow fileReadingFlow() {
    return IntegrationFlows
        .from( Files.inboundAdapter( new File( "/tmp/in" ) ).patternFilter( "*.txt" ),
               e ->
                   e.poller( Pollers.fixedDelay( 10000 )
//                               .taskExecutor( Executors.newFixedThreadPool( 1 ) )
                                 .maxMessagesPerPoll( 100 )
                   ) )
        .channel( MessageChannels.executor( Executors.newFixedThreadPool( 100 ) ) )
        .handle( "myFileHandler", "handleMessage" )
        .get();
  }

  @Bean
  public MessageHandler myFileHandler() {
    return message -> {
      try {
        log.info( "START" + message.getPayload() );
        Thread.sleep( 2000 );
        log.info( "STOP" + message.getPayload() );
      }
      catch ( InterruptedException e ) {
        e.printStackTrace();
      }
    };
  }
}
Hans
  • 681
  • 1
  • 9
  • 22