7

I have setup File poller with task executor

ExecutorService executorService = Executors.newFixedThreadPool(10);

            LOG.info("Setting up the poller for directory {} ", finalDirectory);
            StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
                    c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
                            .taskExecutor(executorService)
                            .maxMessagesPerPoll(10)
                            .advice(new LoggerSourceAdvisor(finalDirectory))
                    ))


                    //move file to processing first processing                    
                    .transform(new FileMoveTransformer("C:/processing", true))
                    .channel("fileRouter")
                    .get();

As seen I have setup fixed threadpool of 10 and maximum message 10 per poll. If I put 10 files it still processes one by one. What could be wrong here ?

* UPDATE *

It works perfectly fine after Gary's answer though I have other issue now.

I have setup my Poller like this

setDirectory(new File(path));
        DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();

        scanner.setFilter(new AcceptAllFileListFilter<>());
        setScanner(scanner);

The reason of using AcceptAll because the same file may come again that's why I sort of move the file first. But when I enable the thread executor the same file is being processed by mutliple threads, I assume because of AcceptAllFile

If I Change to AcceptOnceFileListFilter it works but then the same file that comes again will not be picked up again ! What can be done to avoid this issue ?

Issue/Bug

In Class AbstractPersistentAcceptOnceFileListFilter We have this code

@Override
    public boolean accept(F file) {
        String key = buildKey(file);
        synchronized (this.monitor) {
            String newValue = value(file);
            String oldValue = this.store.putIfAbsent(key, newValue);
            if (oldValue == null) { // not in store
                flushIfNeeded();
                return true;
            }
            // same value in store
            if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
                flushIfNeeded();
                return true;
            }
            return false;
        }
    }

Now for example if I have setup max per poll 5 and there are two files then its possible same file would be picked up by two threads.

Lets say my code moves the files once I read it.

But the other thread gets to the accept method

if the file is not there then it will return lastModified time as 0 and it will return true.

That causes the issue because the file is NOT there.

If its 0 then it should return false as the file is not there anymore.

Makky
  • 17,117
  • 17
  • 63
  • 86

1 Answers1

7

When you add a task executor to a poller; all that does is the scheduler thread hands the poll task off to a thread in the thread pool; the maxMessagesPerPoll is part of the poll task. The poller itself only runs once every 5 seconds. To get what you want, you should add an executor channel to the flow...

@SpringBootApplication
public class So53521593Application {

    private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So53521593Application.class, args);
    }

    @Bean
    public IntegrationFlow flow() {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        return IntegrationFlows.from(() -> "foo", e -> e
                    .poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
                .channel(MessageChannels.executor(exec))
                .<String>handle((p, h) -> {
                    try {
                        logger.info(p);
                        Thread.sleep(10_000);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                    }
                    return null;
                })
                .get();
    }
}

EDIT

It works fine for me...

@Bean
public IntegrationFlow flow() {
    ExecutorService exec = Executors.newFixedThreadPool(10);
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
            .channel(MessageChannels.executor(exec))
            .handle((p, h) -> {
                try {
                    logger.info(p.toString());
                    Thread.sleep(10_000);
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
                return null;
            })
            .get();
}

and

2018-11-28 11:46:05.196 INFO 57607 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 11:46:05.197 INFO 57607 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt

and with touch test1.txt

2018-11-28 11:48:00.284 INFO 57607 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt

EDIT1

Agreed - reproduced with this...

@Bean
public IntegrationFlow flow() {
    ExecutorService exec = Executors.newFixedThreadPool(10);
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
            .channel(MessageChannels.executor(exec))
            .<File>handle((p, h) -> {
                try {
                    p.delete();
                    logger.info(p.toString());
                    Thread.sleep(10_000);
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
                return null;
            })
            .get();
}

and

2018-11-28 13:22:23.689 INFO 75681 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-4] com.example.So53521593Application : /tmp/foo/test2.txt

Community
  • 1
  • 1
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • But I have to send to another channel that files to be processed. How do I route that from here ? – Makky Nov 28 '18 at 15:11
  • Sorry, I don't know what you mean; just put the `.channel()` before the transformer and remove the executor from the poller. – Gary Russell Nov 28 '18 at 15:23
  • I actually tried but I see unexpected behaviour it looks like its not thread-safe. – Makky Nov 28 '18 at 15:25
  • Your code? The framework is thread safe. If you want to process files in parallel your code needs to be thread safe. – Gary Russell Nov 28 '18 at 15:31
  • Use a `FileSystemPersistentAcceptOnceFileListFilter` - it will allow the same file name to pass only if the `lastModified` time changes. – Gary Russell Nov 28 '18 at 15:48
  • Thanks I understand this. – Makky Nov 28 '18 at 15:59
  • I have used like this FileSystemPersistentAcceptOnceFileListFilter fileSystemPersistentAcceptOnceFileListFilter = new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), ""); fileSystemPersistentAcceptOnceFileListFilter.setFlushOnUpdate(true); I am still seeing same file being processed twice !!! – Makky Nov 28 '18 at 16:34
  • I don't see how that's possible; it works fine for me; see the edit to my answer. – Gary Russell Nov 28 '18 at 16:47
  • Not for the simple store - it's used when using a shared store such as redis. – Gary Russell Nov 28 '18 at 16:50
  • Good catch - please open a [bug report](https://jira.spring.io/browse/INT). Reproduced; see my second edit. – Gary Russell Nov 28 '18 at 18:21
  • We were just about to start a new release build - I will create the ticket. [INT-4560](https://jira.spring.io/browse/INT-4560). – Gary Russell Nov 28 '18 at 18:30
  • Thanks for your help – Makky Nov 28 '18 at 18:39
  • Thanks for this, I was struggling a lot to do concurrent processing, I wonder why this is not in the documentation anywhere? This is a very common use case... who wants to process sequentially? – Diego Ramos Jul 30 '22 at 22:58
  • The “very common use case” is strict ordering. The framework has to take the most conservative approach to avoid unexpected side effects. While providing mechanisms for concurrency. – Gary Russell Jul 30 '22 at 23:49