5

I am happily using the watchdog package, specifically PollingObserver, to watch directories for file events. It works great - until the directory I am watching is deleted. What then happens is the code that is polling the directory calls stat() on a non-existent directory, and raises an exception. What is the best way to handle this? I don't see how I can catch this exception as it's in a separate thread.

Sample Code:

import sys
import time
import logging
from watchdog.observers.polling import PollingObserver
from watchdog.events import LoggingEventHandler

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s - %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    path = sys.argv[1] if len(sys.argv) > 1 else '.'
    event_handler = LoggingEventHandler()
    observer = PollingObserver()
    print("Watching: ", path)
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
        observer.join()

to see what I mean, pass an existing directory as an argument, then delete it.

Jeffrey DeLeo
  • 1,672
  • 13
  • 19

1 Answers1

0

I have faced quite the following problem and my solution was to create a custom emitter, use it in custom observer and then use this observer.

Emitter code:

class CustomEmitter(PollingEmitter):

    def __init__(self, event_queue, watch,
                 timeout=DEFAULT_EMITTER_TIMEOUT,
                 stat=default_stat, listdir=os.listdir, logger=None):
        super(CustomEmitter, self).__init__(event_queue,
                                            watch,
                                            timeout,
                                            stat,
                                            listdir)
        self._take_snapshot = lambda: self._take_safe_snapshot(
            self.watch.path, self.watch.is_recursive, stat=stat,
            listdir=listdir)
        self.logger = logger
        self.folder_was_missing = None

def _take_safe_snapshot(self, path, is_recursive, stat, listdir):
    try:
        snapshot = DirectorySnapshot(path, is_recursive,
                                     stat=stat,
                                     listdir=listdir)
        if self.folder_was_missing:
            self.logger.info("Observer successfully found the missing "
                             "directory {}.".format(path))
        self.folder_was_missing = False
        return snapshot
    except FileNotFoundError as fe:
        if not self.folder_was_missing:
            self.folder_was_missing = True
            self.logger.info("Observer can't find the directory {}."
                             "".format(path))
            self.logger.exception(fe)
            if self.logger.level == logging.DEBUG:
                traceback.print_exc()

    def queue_events(self, timeout):

        # We don't want to hit the disk continuously.
        # timeout behaves like an interval for polling emitters.
        if self.stopped_event.wait(timeout):
            return

        with self._lock:
            if not self.should_keep_running():
                return

            # Get event diff between fresh snapshot and previous snapshot.
            # Update snapshot.
            new_snapshot = self._take_snapshot()
            if new_snapshot is None:
                return
            events = DirectorySnapshotDiff(self._snapshot, new_snapshot)
            self._snapshot = new_snapshot

            # Files.
            for src_path in events.files_deleted:
                self.queue_event(FileDeletedEvent(src_path))
            for src_path in events.files_modified:
                self.queue_event(FileModifiedEvent(src_path))
            for src_path in events.files_created:
                self.queue_event(FileCreatedEvent(src_path))
            for src_path, dest_path in events.files_moved:
                self.queue_event(FileMovedEvent(src_path, dest_path))

            # Directories.
            for src_path in events.dirs_deleted:
                self.queue_event(DirDeletedEvent(src_path))
            for src_path in events.dirs_modified:
                self.queue_event(DirModifiedEvent(src_path))
            for src_path in events.dirs_created:
                self.queue_event(DirCreatedEvent(src_path))
            for src_path, dest_path in events.dirs_moved:
                self.queue_event(DirMovedEvent(src_path, dest_path))

Observer code:

class TagIDObserver(BaseObserver):
    def __init__(self, logger, timeout=DEFAULT_OBSERVER_TIMEOUT):
        super(TagIDObserver, self).__init__(emitter_class=TagIDEmitter,
                                        timeout=timeout)
        self.logger = logger

    def schedule(self, event_handler, path, recursive=False):
        """
        Schedules watching a path and calls appropriate methods specified
        in the given event handler in response to file system events.

        :param event_handler:
            An event handler instance that has appropriate event handling
            methods which will be called by the observer in response to
            file system events.
        :type event_handler:
            :class:`watchdog.events.FileSystemEventHandler` or a subclass
        :param path:
            Directory path that will be monitored.
        :type path:
            ``str``
        :param recursive:
            ``True`` if events will be emitted for sub-directories
            traversed recursively; ``False`` otherwise.
        :type recursive:
            ``bool``
        :return:
            An :class:`ObservedWatch` object instance representing
            a watch.
        """
        with self._lock:
            watch = ObservedWatch(path, recursive)
            self._add_handler_for_watch(event_handler, watch)

        # If we don't have an emitter for this watch already, create it.
        if self._emitter_for_watch.get(watch) is None:
            emitter = self._emitter_class(event_queue=self.event_queue,
                                          watch=watch,
                                          timeout=self.timeout,
                                          logger=self.logger)
            self._add_emitter(emitter)
            if self.is_alive():
                emitter.start()
            self._watches.add(watch)
        return watch

Also, as you can see, if you do not need the logging, you can keep the original schedule in Observer and override only __ init __

Vladimir
  • 103
  • 2
  • 8