2

I have InboundChannelAdapter configured with S3StreamingMessageSource. I forced Poller to use taskExecutor with only 1 thread. But I see the same file is being picked up by the same thread 3 times with 3-4 seconds interval. Even though poller interval is 10 seconds. I've specified Composite filter which consists of pattern filter and acceptoncefilter. But no result, file is always picked up 3 times.

String prefix = "some_prefix";
String channel = "some_channel"
Pattern filePattern = Pattern.compile(
            "^" + prefix + "some_file_name_pattern");
@Bean
@InboundChannelAdapter(value = channel,
        poller = @Poller(fixedDelay = "10000", taskExecutor = "threadPoolTaskExecutor"))
public MessageSource<InputStream> createS3InboundStreamingMessageSource() {
    S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
    messageSource.setRemoteDirectory(bucketName);
    CompositeFileListFilter<S3ObjectSummary> compositeFileListFilter = new ChainFileListFilter<>();
    compositeFileListFilter.addFilter(new S3PersistentAcceptOnceFileListFilter(
            new SimpleMetadataStore(), prefix));
    compositeFileListFilter.addFilter(new S3RegexPatternFileListFilter(filePattern));
    messageSource.setFilter(compositeFileListFilter);
    return messageSource;
}

@Transformer(inputChannel = channel,"another_channel")
public Message<S3ObjectInputStream> enrich(Message<S3ObjectInputStream> s3ObjectInputStreamMessage) {
    S3ObjectInputStream s3ObjectInputStream = s3ObjectInputStreamMessage.getPayload();
    URI zipUri = s3ObjectInputStream.getHttpRequest().getURI();
    LOGGER.info("Picking up file : {}", zipUri.getPath());
    ...
}

 private S3RemoteFileTemplate template() {
    S3SessionFactory sessionFactory = new S3SessionFactory(amazonS3);
    return new S3RemoteFileTemplate(sessionFactory);
}

@Bean
public TaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setMaxPoolSize(1);
    executor.setThreadNamePrefix("single_thread_task_executor");
    executor.initialize();
    return executor;
}

I see that the app comes to @Transformer 3 times. Would really appreciate any help.

Vera
  • 98
  • 5
  • There should be something else in your application. Maybe you have several `@InboundChannelAdapter` for the same `bucketName`, so they don't poll S3 files concurrently. Would be great if you can debug your application and place a break point in the `AbstractRemoteFileStreamingMessageSource` on the `listFiles()` method to see what the `files` are returned from the bucket and how then they are filtered on the line `List filteredFiles = this.filter == null ? Arrays.asList(files) : this.filter.filterFiles(files);`. I don't see anything in your current code what may cause such a behavior... – Artem Bilan Oct 31 '18 at 14:00

0 Answers0