3

I'm following the docs: Spring Batch Integration combining with the Integration AWS for pooling the AWS S3.

But the batch execution per each file is not working in some situations.

The AWS S3 Pooling is working correctly, so when I put a new file or when I started the application and there's files in the bucket the application sync with the local directory:

    @Bean
    public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
        return new S3SessionFactory(pAmazonS3);
    }

    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
        S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(pS3SessionFactory);
        synchronizer.setPreserveTimestamp(true);
        synchronizer.setDeleteRemoteFiles(false);
        synchronizer.setRemoteDirectory("remote-bucket");
        //synchronizer.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "simpleMetadataStore"));
        return synchronizer;
    }

    @Bean
    @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
            S3InboundFileSynchronizer pS3InboundFileSynchronizer) {
        S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new FileSystemResource("files").getFile());
        //messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "fsSimpleMetadataStore"));
        return messageSource;
    }

    @Bean("s3filesChannel")
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

I followed the tutorial so I created the FileMessageToJobRequest I won't put the code here because it's the same as the docs

So I created the beans IntegrationFlow and FileMessageToJobRequest:

    @Bean
    public IntegrationFlow integrationFlow(
            S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource) {
        return IntegrationFlows.from(pS3InboundFileSynchronizingMessageSource, 
                         c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
                .transform(fileMessageToJobRequest())
                .handle(jobLaunchingGateway())
                .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
                .get();
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest() {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setFileParameterName("input.file.name");
        fileMessageToJobRequest.setJob(delimitedFileJob);
        return fileMessageToJobRequest;
    }

So in the JobLaunchingGateway I think is the problem:

If I created like this:

    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

        return jobLaunchingGateway;
    }

Case 1 (Bucket is empty when the application starts):

  • I upload a new file in the AWS S3;
  • The pooling works and the file appears in the local directory;
  • But the transform/job isn't fired;

Case 2 (Bucket already has one file when application starts):

  • The job is launched:
2021-01-12 13:32:34.451  INFO 1955 --- [ask-scheduler-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=arquivoDelimitadoJob]] launched with the following parameters: [{input.file.name=files/FILE1.csv}]
2021-01-12 13:32:34.524  INFO 1955 --- [ask-scheduler-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [delimitedFileJob]
  • If I add a second file in S3, the job isn't launched as the case 1.

Case 3 (Bucket has more than one file):

  • The files are synchronized correctly in local directory
  • But the job is only executed once for the last file.

So following the docs I change my Gateway to:

    @Bean
    @ServiceActivator(inputChannel = IN_CHANNEL_NAME, poller = @Poller(fixedRate="1000"))
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());

        //JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
        //jobLaunchingGateway.setOutputChannel(replyChannel());
        jobLaunchingGateway.setOutputChannel(s3FilesChannel());
        return jobLaunchingGateway;
    }

With this new gateway implementation, if I put a new file in S3 the application reacts but didn't transform giving the error:

Caused by: java.lang.IllegalArgumentException: The payload must be of type JobLaunchRequest. Object of class [java.io.File] must be an instance of class org.springframework.batch.integration.launch.JobLaunchRequest

And if there's two files in the bucket (when the apps starts) FILE1.csv and FILE2.csv, the job runs for the FILE1.csv correctly, but give the error above for the FILE2.csv.

What's the correct way to implement something like this?

Just to be clear I want to receive thousand of csv files in this bucket, read and process with Spring Batch, but I also need to get every new file asap from S3.

Thanks in advance.

Guilherme Bernardi
  • 490
  • 1
  • 6
  • 18

1 Answers1

1

The JobLaunchingGateway indeed expects from us only JobLaunchRequest as a payload.

Since you have that @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30")) on the S3InboundFileSynchronizingMessageSource bean definition, it is really wrong to have then @ServiceActivator(inputChannel = IN_CHANNEL_NAME for that JobLaunchingGateway without FileMessageToJobRequest transformer in between.

Your integrationFlow looks OK for me, but then you really need to remove that @InboundChannelAdapter from the S3InboundFileSynchronizingMessageSource bean and fully rely on the c.poller() configuration.

Another way is to leave that @InboundChannelAdapter, but then start the IntegrationFlow from the IN_CHANNEL_NAME not a MessageSource.

Since you have several poller against the same S3 source, plus both of then are based on the same local directory, it is not a surprise to see so many unexpected situations.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • You're right. Silly error... I decided to remove the @InboundChannelAdapter from S3InboundFile and it works. – Guilherme Bernardi Jan 12 '21 at 20:51
  • Well, that's not so silly, especially when you copy/paste it from somewhere. More over it even possible to do that and have such a complex configuration in your application. You kinda have a concurrent consumers for that S3 backed. But I'm glad that it was so easy to fix! – Artem Bilan Jan 12 '21 at 20:59
  • Yes, I didn't read the both docs and combine the samples. Do you think what I did is a correct approach for my case? I mean in scalability terms for example 100k files are uploaded to my bucket and my app scales, using this approach with batch-integration are they will concurrent try to process the same file? – Guilherme Bernardi Jan 12 '21 at 23:25
  • If you are going to scale , you need to think about persistent file list filter with shared metadata store. See FTP docs in Spring Integration. – Artem Bilan Jan 13 '21 at 01:51