1

I created a pollableChannel which is listening a S3 Bucket getting files and launching a job.

My classe is like this:

    @Bean
    public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
        return new S3SessionFactory(pAmazonS3);
    }

    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory s3SessionFactory) {
        S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(s3SessionFactory);
        synchronizer.setPreserveTimestamp(true);
        synchronizer.setDeleteRemoteFiles(false);
        synchronizer.setRemoteDirectory(awsS3Properties.getCercBucket());
        return synchronizer;
    }

    @Bean
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
            S3InboundFileSynchronizer s3InboundFileSynchronizer) {
        S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(
                s3InboundFileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new FileSystemResource(integrationProperties.getTempDirectoryName()).getFile());
        return messageSource;
    }

    @Bean("${receivable.integration.inChannel}")
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow integrationFlow(
            S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource) {
        return IntegrationFlows
                .from(s3InboundFileSynchronizingMessageSource,
                        c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
                .transform(fileMessageToJobRequest()).handle(jobLaunchingGateway())
                .get();
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest() {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setFileParameterName("input.file.name");
        fileMessageToJobRequest.setJob(receivablePositionJob);
        return fileMessageToJobRequest;
    }

    @Bean
    @ServiceActivator(inputChannel = "${receivable.integration.inChannel}", poller = @Poller(fixedRate = "1000"))
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());

        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
        jobLaunchingGateway.setOutputChannel(s3FilesChannel());
        return jobLaunchingGateway;
    }

And my FileMessageToJobRequest is like this:

public class FileMessageToJobRequest {

    private Job job;

    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }

}

I want to add a custom MessageHeader in the Message or my second option is intercept the context before the message is published due to I need to set my tenant in ThreadLocal.

How could I do that?

Thanks in advance.


UPDATE with enrichHeaders:

    @Bean
    public IntegrationFlow integrationFlow(
            S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource) {
        return IntegrationFlows
                .from(s3InboundFileSynchronizingMessageSource,
                        c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
                .transform(fileMessageToJobRequest())
                .enrichHeaders(Map.of("teste", "testandio"))
                .handle(jobLaunchingGateway())
                .get();
    }
christianGen
  • 100
  • 1
  • 6
Guilherme Bernardi
  • 490
  • 1
  • 6
  • 18

1 Answers1

0

First of all you must remove that @ServiceActivator(inputChannel = "${receivable.integration.inChannel}" since it points to the same s3FilesChannel, which is an outputChannel of that JobLaunchingGateway, too. So, you are making a loop with such a configuration. Not sure how it works for you at all...

To add a header before sending to that JobLaunchingGateway, you just need to add enrichHeaders() before your .handle(jobLaunchingGateway()) in that integrationFlow definition.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Hi Artem, two points: The first one about the ServiceActivator weird here it's working and you explain to me in another question: https://stackoverflow.com/a/65688685/8107849 About the enrichHeaders I tried with it and didn't work. – Guilherme Bernardi Feb 05 '21 at 20:55
  • Artem take a look: https://ibb.co/5KbJF3K I put `.enrichHeaders(Map.of("teste", "testandio"))` – Guilherme Bernardi Feb 05 '21 at 20:57
  • Well, that's not related your `${receivable.integration.inChannel}` is used in the service activator and also as a bean name for that `s3FilesChannel`. So, that's how I see it is going to be a loop because both are going to be resolved to the same object. Please, reconsider. Also I don't see how that `@ServiceActivator` is relevant here at all... – Artem Bilan Feb 05 '21 at 20:58
  • Why do you show me screenshot? Show how you have tried to set headers. The `enrichHeaders()` must work before that `.handle(jobLaunchingGateway())`. If it is not a case, you really send messages some other way, not via that `IntegrationFlow`... – Artem Bilan Feb 05 '21 at 21:00
  • I updated the question with enrichHeaders, unfortunately isn't work. – Guilherme Bernardi Feb 05 '21 at 21:03
  • Although it shows that `@Bean(name)` attribute does not support properties placeholder. So, something is still fishy in your code. And unfortunately it is more confusing than helping us to understand your problem. – Artem Bilan Feb 05 '21 at 21:04
  • Weird, it's working. It's getting all the files and launching the job, this is fine. My problem is that I want to set my TenantThreadLocalStorage before the message is sent. – Guilherme Bernardi Feb 05 '21 at 21:09
  • Where do you check for that `teste` header? If you can debug and see some headers, you probably can really go over all the code in your project and debug it from any point. For example, you can open a `JobLaunchingGateway` source code and place there a break point to observe headers of the message arrived after that `enrichHeaders()`. – Artem Bilan Feb 05 '21 at 21:10
  • I checking at my `public class ChannelInterceptor implements ExecutorChannelInterceptor {` in preSend method. – Guilherme Bernardi Feb 05 '21 at 21:14
  • That's something new. Why can't you just show a minimal code which you'd like to have working, but it doesn't and show how it doesn't. I don't know where you use this interceptor. Right? – Artem Bilan Feb 05 '21 at 21:19
  • I'll improve the answer, but the main question is maybe how the S3Inbound works. The file is detect and turn into a message before transform to JobRequest and send to the job. I want to intercept the first step when the File is Detected as a Message and set my Tenant. This Interceptor I use to check if the Tenant was set before the Message is sent because I add the Tenant as a Header. – Guilherme Bernardi Feb 05 '21 at 21:28
  • One more time: the `enrichHeaders()` before `.handle(jobLaunchingGateway())` is the right place to add more headers. If that doesn't work for you somehow, you send to `jobLaunchingGateway` some other way, or your interceptor is in the wrong place. Show, please, where do you apply that interceptor. – Artem Bilan Feb 05 '21 at 21:45
  • Any update on the matter? BTW you can log DEBUG for `org.springframework.integration` category to see how your message travels and with what headers. – Artem Bilan Feb 09 '21 at 15:01
  • Hi Artem, thank you for your attention. I solved inside my own business logic verifying the instance of the message and setting the property that I wanted, but I'll try to solved that due to a new case. – Guilherme Bernardi Feb 12 '21 at 15:35