1

I am trying to integrate Multithreading with FileWatcher service in java. i.e., I am constantly listening to a particular directory -> whenever a new file is created, I need to spawn a new thread which processes the file (say it prints the file contents). I kind of managed to write a code which compiles and works (but not as expected). It works sequentially meaning file2 is processed after file1 and file 3 is processed after file 2. I want this to be executed in parallel.

Adding the code snippet:

while(true) {
            WatchKey key;
            try {
                key = watcher.take();
                Path dir = keys.get(key);
                for (WatchEvent<?> event: key.pollEvents()) {
                    WatchEvent.Kind<?> kind = event.kind();
                    if (kind == StandardWatchEventKinds.OVERFLOW) {
                        continue;
                    }
                    if(kind == StandardWatchEventKinds.ENTRY_CREATE){
                        boolean valid = key.reset();
                        if (!valid) {
                            break;
                        }
                        log.info("New entry is created in the listening directory, Calling the FileProcessor");
                        WatchEvent<Path> ev = (WatchEvent<Path>)event;
                        Path newFileCreatedResolved = dir.resolve(ev.context());
                        try{
                        FileProcessor processFile = new FileProcessor(newFileCreatedResolved.getFileName().toString());
                        Future<String> result = executor.submit(processFile);
                            try {
                                System.out.println("Processed File" + result.get());
                            } catch (ExecutionException e) {
                                e.printStackTrace();
                            }
                        //executor.shutdown(); add logic to shut down
                        }   
                    }
                }
            }
        }

and the FileProcessor class

public class FileProcessor implements Callable <String>{
    FileProcessor(String triggerFile) throws FileNotFoundException, IOException{
        this.triggerFile = triggerFile;
    }
    public String call() throws Exception{
        //logic to write to another file, this new file is specific to the input file
        //returns success
    }

What is happening now -> If i transfer 3 files at a time, they are sequentially. First file1 is written to its destination file, then file2, file3 so on.

Am I making sense? Which part I need to change to make it parallel? Or Executor service is designed to work like that.

  • Show how the executor is created. Not every executor uses multiple threads. Also, you probably can't transfer 3 files atomically, they are created serially. How do you know that they can't be processed in parallel? Maybe the process is just so fast that it has no trouble keeping up with the serial transfer. To test, add a `sleep()` call to the processing to see whether it is truly serialized. – erickson Aug 08 '17 at 18:12
  • Executor Creation: `ExecutorService executor = (ExecutorService) Executors.newFixedThreadPool(5)` – DillibabuSripriya Aug 09 '17 at 03:51
  • Lets assume file1 has 1 million lines and file2 has just 10 lines. If I create file1 and file2 in the listening directory, file2 should be completed first and then file2. Right? What the code is doing now is, file1 is written to destination directory and then file2 processing starts. – DillibabuSripriya Aug 09 '17 at 03:57

1 Answers1

0

The call to Future.get() is blocking. The result isn't available until processing is complete, of course, and your code doesn't submit another task until then.

Wrap your Executor in a CompletionService and submit() tasks to it instead. Have another thread consume the results of the CompletionService to do any processing that is necessary after the task is complete.

Alternatively, you can use the helper methods of CompletableFuture to set up an equivalent pipeline of actions.

A third, simpler, but perhaps less flexible option is simply to incorporate the post-processing into the task itself. I demonstrated a simple task wrapper to show how this might be done.

erickson
  • 265,237
  • 58
  • 395
  • 493
  • Thanks Much... There is not much processing after the task is completed. So I guess 3rd option will be enough. – DillibabuSripriya Aug 17 '17 at 07:06
  • @DillibabuSripriya If you take a look at the answer here, https://stackoverflow.com/a/826283/3474 , I added an answer with `CompletableFuture`. It's pretty simple too, just a couple of lines. – erickson Aug 17 '17 at 14:51