Made an updated version of the file provided by @UDPLover that is built for use in a high rate of file access environment I converted the to process queue into a HashMap<String, WatchEvent>
to carry the watch event over to pass to an abstract method inside of the file blocking checker itself. Also made a print() method that allows anything printed to console by the WatchCore to be enabled or disabled. The file polling for loop from the original example was updated to use the JDK8 function for loop, made all parts threaded/interrupted. This is untested as of yet, will update with fixes when I can test it.
package filewatcher;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import static java.nio.file.LinkOption.NOFOLLOW_LINKS;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
* @author Nackloose
* http://stackoverflow.com/questions/13998379/directory-watching-for-changes-in-java
*/
public abstract class WatchCore extends Thread {
//make class a thread by default
/**
* After the WatchCore recieves an event for a file and deems it unlocked,
* it will be passed to this function
*
* @param e WatchEvent for the file, after it has been affirmed to be
* unlocked.
*/
public abstract void onEventAndUnlocked(WatchEvent e);
private final WatchService watcher;
private final Map<WatchKey, Path> keys;
private final boolean recursive;
private boolean trace = false;
//converted to HashMap to remove the limitation as I need this in a high rate of file access enviroment.
//as well as to carry the event passed for that folder into the block check itself.
//got rid of the finished queue and made events pass to the abstract void above
private final HashMap<String, WatchEvent> fileProcessingQueue;
//create a varible to keep track of the thread checking the file blocking, so we can start and stop it.
private final WatchBlocker blocker;
public WatchCore(String dir) throws IOException {
//defaultly dont recurse
this(dir, false);
}
public WatchCore(String dir, boolean recursive) throws IOException {
this(Paths.get(dir), recursive);
}
public WatchCore(Path dir) throws IOException {
//defaultly dont recurse
this(dir, false);
}
public WatchCore(Path dir, boolean recursive) throws IOException {
fileProcessingQueue = new HashMap<>();
this.watcher = FileSystems.getDefault().newWatchService();
this.keys = new HashMap<>();
this.recursive = recursive;
//CreateTxtFile.createFile(dir, 1);
if (recursive) {
print("Scanning %s ...", dir);
registerAll(dir);
print("Done.");
} else {
register(dir);
}
// enable trace after initial registration
this.trace = true;
//start the thread to process files to be checked for file blocking
blocker = new WatchBlocker();
}
@SuppressWarnings("unchecked")
private static <T> WatchEvent<T> cast(WatchEvent<?> event) {
return (WatchEvent<T>) event;
}
@Override
public synchronized void start() {
//start up our thread _FIRST_
super.start();
//then start the blocking thread
blocker.start();
}
@Override
public void interrupt() {
//Everything backwards, stop the blocker _FIRST_
blocker.interrupt();
//then stop our thread.
super.interrupt();
}
/**
* Register the given directory with the WatchService
*/
private void register(Path dir) throws IOException {
//WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
WatchKey key = dir.register(watcher, ENTRY_CREATE);
if (trace) {
Path prev = keys.get(key);
if (prev == null) {
print("register: %s\n", dir);
} else {
if (!dir.equals(prev)) {
print("update: %s -> %s\n", prev, dir);
}
}
}
keys.put(key, dir);
}
/**
* Register the given directory, and all its sub-directories, with the
* WatchService.
*/
private void registerAll(final Path start) throws IOException {
// register directory and sub-directories
Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
register(dir);
return FileVisitResult.CONTINUE;
}
});
}
/**
* Process all events for keys queued to the watcher
*/
@Override
public void run() {
//this was previous called processEvents()
//pruned any un-nessicary continues, labels, and labels on breaks, a lot of them
//were redundant
print("DirWatcherThread started.");
//as long as we're not interrupted we keep working
while (!interrupted()) {
// wait for key to be signalled
WatchKey key;
try {
key = watcher.take();
} catch (InterruptedException x) {
// if we are returning from these method, it means we no longer wants to watch directory
// we must close thread which may be waiting for file names in queue
continue;
} catch (ClosedWatchServiceException cwse) {
break;
}
Path dir = keys.get(key);
if (dir == null) {
printe("WatchKey not recognized!!");
continue;
}
try {
//converted to functional for loop.
key.pollEvents().stream().filter((event) -> {
WatchEvent.Kind kind = event.kind();
return !(kind == OVERFLOW); //make sure we do the filter
}).forEach((event) -> {
WatchEvent.Kind kind = event.kind();
// Context for directory entry event is the file name of entry
WatchEvent<Path> ev = cast(event);
Path name = ev.context();
Path child = dir.resolve(name);
if (kind.equals(ENTRY_CREATE)) {
// if directory is created, and watching recursively, then
// register it and its sub-directories
if (recursive) {
try {
if (Files.isDirectory(child, NOFOLLOW_LINKS)) {
registerAll(child);
return; //continue;
}
} catch (IOException x) {
// ignore to keep sample readbale
}
}
fileProcessingQueue.put(child.toString(), ev);
}
});
// reset key and remove from set if directory no longer accessible
boolean valid = key.reset();
if (!valid) {
keys.remove(key);
// all directories are inaccessible
if (keys.isEmpty()) {
break;
}
}
} catch (ClosedWatchServiceException cwse) {
break;
}
}
print("DirWatcherThread exited.");
}
/**
*
* @author
* http://stackoverflow.com/questions/13998379/directory-watching-for-changes-in-java
* Nackloose
*/
private class WatchBlocker extends Thread {
@Override
public synchronized void start() {
//get it going
super.start();
}
@Override
public void interrupt() {
//interupt our thread
super.interrupt();
}
@Override
public void run() {
//this was perviously processFiles()
//pruned any un-nessicary continues, labels, and labels on breaks, a lot of them
//were redundant
print("DirWatchProcessingThread Started");
Entry<String, WatchEvent> fileEvent;
outerLoop:
//as long as we're not interrupted we keep working
while (!interrupted()) {
if (fileProcessingQueue.isEmpty()) {
try {
Thread.sleep(WatchCoreParameters.timeToIdle);
} catch (InterruptedException ex) {
Logger.getLogger(WatchCore.class.getName()).log(Level.SEVERE, null, ex);
}
continue;
}
fileEvent = fileProcessingQueue.entrySet().iterator().next();
fileProcessingQueue.remove(fileEvent.getKey());
long startTime = System.currentTimeMillis();
while (true) {
FileInputStream fis = null;
File file = new File(fileEvent.getKey());
try {
fis = new FileInputStream(fileEvent.getKey());
break;
} catch (FileNotFoundException fnfe) {
if (!file.exists() || file.isDirectory()) {
print("File: '" + fileEvent + "has been deleted in file system or it is not file. Not processing this file.");
continue outerLoop;
}
try {
Thread.sleep(WatchCoreParameters.millisToPauseForFileLock);
} catch (InterruptedException ie) {
}
if ((System.currentTimeMillis() - startTime) > WatchCoreParameters.millisToSwapFileForUnlocking) {
fileProcessingQueue.put(fileEvent.getKey(), fileEvent.getValue());
}
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}
}
print("Queuing File: " + fileEvent);
//pass the unlocked file event to the abstract method
onEventAndUnlocked(fileEvent.getValue());
}
print("DirWatchProcessingThread Exited");
}
}
/**
*
* @author
* http://stackoverflow.com/questions/13998379/directory-watching-for-changes-in-java
* Nackloose
*/
public static class WatchCoreParameters {
public static int timeToIdle = 2000, millisToPauseForFileLock = 200,
millisToSwapFileForUnlocking = 2000;
public static boolean verbose = false;
}
//<editor-fold defaultstate="collapsed" desc="Printing methods">
private void print(String s) {
//defaultly we're not writing an error
print(s, false);
}
public static final void print(String s, boolean error) {
//check verbosity, exit if none.
if (!WatchCoreParameters.verbose) {
return;
}
//if this is an error, assign System.err to a temp varible
//otherise assign System.out for normal printing
PrintStream out = (!error ? System.out : System.err);
if (s.contains("\n")) { // check to see if theirs a new line
out.print(s); //print accordingly
} else {
out.println(s); //print accordingly
}
}
public static final void printe(String s) {
//shortcut/convenience method for printing an error
print(s, true);
}
public static final void print(String s, Object... formatObj) {
//check verbosity, exit if none.
if (!WatchCoreParameters.verbose) {
return;
}
//format the object into the string, and if no newline is there, add it.
System.out.format(s + (s.contains("\n") ? "" : "\n"), formatObj);
}
//</editor-fold>
}