0

I have a Java application that watches a directory and when XML files are added to that directory, it parses out data from the XML file.

It works file as a single-threaded application, but I'm thinking out making it multi-threaded so multiple files can be processed simultaneously.

Question is, when thread #1 finds a file and starts processing it, how can I mark the file as 'in progress' so thread #2 doesn't try to process it to?

I was thinking that the thread could simply rename the file once it starts working on it, to myfile.xml.inprogress, and then myfile.xml.finished when done.

But if I do that, is it possible that two threads will see the file at the same time and both will try to rename it simultaneously?

I might also want to run two instances of this application reading files in the same directory, so whatever path I take supports multiple processes.

Thanks!

  • @Mikrobus If thread#1 creates a new File() for the file, is thread#2 prevented from creating a new File() for the same file? What about if thread#2 is in a different process? –  Sep 22 '14 at 17:57
  • Possible duplicate of http://stackoverflow.com/questions/128038/how-can-i-lock-a-file-using-java-if-possible – Antoniossss Sep 22 '14 at 17:59
  • I would probably have one "monitor/scheduler" thread and multiple processor threads. Monitor thread schedules jobs in-memory for processor threads. You can have better in-memory locking this way. – srkavin Sep 22 '14 at 17:59

3 Answers3

1

You should use a Producer-Consumer pattern. Have one thread to listen for changes in the files and pass off that work to other threads.

You can use a BlockingQueue for this to make the code very simple.

First you need two classes, a producer:

class Producer implements Callable<Void> {

    private final BlockingQueue<Path> changedFiles;

    Producer(BlockingQueue<Path> changedFiles) {
        this.changedFiles = changedFiles;
    }

    @Override
    public Void call() throws Exception {
        while (true) {
            if (something) {
                changedFiles.add(changedFile);
            }
            //to make the thread "interruptable"
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(1));
            } catch (InterruptedException ex) {
                break;
            }
        }
        return null;
    }
}

And a Consumer:

class Consumer implements Callable<Void> {

    private final BlockingQueue<Path> changedFiles;

    Consumer(BlockingQueue<Path> changedFiles) {
        this.changedFiles = changedFiles;
    }

    @Override
    public Void call() throws Exception {
        while (true) {
            try {
                final Path changedFile = changedFiles.take();
                //process your file
            //to make the thread "interruptable"
            } catch (InterruptedException ex) {
                break;
            }
        }
        return null;
    }
}

So, now create an ExecutorService, submit one Producer and as many consumers as you need:

final BlockingQueue<Path> queue = new LinkedBlockingDeque<>();
final ExecutorService executorService = Executors.newCachedThreadPool();
final Collection<Future<?>> consumerHandles = new LinkedList<>();
for (int i = 0; i < numConsumers; ++i) {
    consumerHandles.add(executorService.submit(new Consumer(queue)));
}
final Future<?> producerHandle = executorService.submit(new Producer(queue));

So you guarantee that only one file is being worked on at a time because you control that yourself. You also do so with minimum synchronisation.

It might be worthwhile the Consumer also reading the file to remove the the shared disc IO that will happen otherwise - this will likely slow the system down. You could also add another Consumer that writes changed files at the other end to completely eliminate shared IO.

To shutdown the system simply call:

executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.DAYS);

Because your workers are interruptible this will bring down the ExectuorService once tasks currently in progress have finished.

Boris the Spider
  • 59,842
  • 6
  • 106
  • 166
0

Producer Consumer is definitely the idea here.

With Java 7 there is a WatchService provided which can take care of the Producer problem even though it is a pain to work with.

Have an ExecutorService with the desired pool size to take care of Consumers.

Here is it how it all wires up.

public class FolderWatchService {

    private ExecutorService executorService = Executors.newFixedThreadPool(5);

    public void watch() throws Exception {
        Path folder = Paths.get("/home/user/temp");
        try (WatchService watchService = FileSystems.getDefault().newWatchService()) {

            folder.register(watchService,
                    StandardWatchEventKinds.ENTRY_CREATE,
                    StandardWatchEventKinds.ENTRY_MODIFY,
                    StandardWatchEventKinds.ENTRY_DELETE);
            while(true) {
                final WatchKey key = watchService.take();
                if (key != null) {
                    for (WatchEvent<?> watchEvent : key.pollEvents()) {
                        WatchEvent<Path> event = (WatchEvent<Path>) watchEvent;
                        Path dir = (Path) key.watchable();
                        Path absolutePath = dir.resolve(event.context());
                        executorService.submit(new WatchTask(absolutePath));
                    }
                    key.reset();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        FolderWatchService folderWatchService = new FolderWatchService();
        folderWatchService.watch();
    }

}
class WatchTask implements Runnable {

    private Path absolutePath;

    WatchTask(Path absolutePath) {
        this.absolutePath = absolutePath;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + absolutePath.toAbsolutePath());
         try (BufferedReader reader = Files.newBufferedReader(absolutePath , StandardCharsets.UTF_8)) {
             //Do read
             reader.lines().forEach(line -> System.out.println(line));
         } catch (IOException e) {
             e.printStackTrace();
         }

    }
}
Ajay George
  • 11,759
  • 1
  • 40
  • 48
  • Note that this might well produce an infinite loop if the reader then writes. – Boris the Spider Sep 22 '14 at 22:32
  • while (true) is an infinite loop definitely either way since I am not exiting. while (conditionToExit) would be a better way. But this code sample is supposed to be a pointer not production ready. – Ajay George Sep 22 '14 at 23:03
  • No, I mean that you have a loop where a write to a file immediately causes the `WatchService` to pick up the change and queue it for processing. Which would cause another thread to pick up the _same_ file. Now when either of those two threads writes the file will be queued again... – Boris the Spider Sep 23 '14 at 07:09
  • Ah.. got your point. Wouldn't listening to just creation events resolve that issue. `StandardWatchEventKinds.ENTRY_CREATE` – Ajay George Sep 23 '14 at 17:54
-1
  1. You can use java.nio.channels.FileLock: http://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileLock.html to synchronize access between different processes.
  2. For synchronization between different threads running inside the same process, you can use java.util.concurrent.locks.Lock: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Lock.html
kraskevich
  • 18,368
  • 4
  • 33
  • 45