0

I am using "watchdog" api to keep checking changes in a folder in my filesystem. Whatever files changes in that folder, I pass them to a particular function which starts threads for each file I pass them.

But watchdog, or any other filesystem watcher api (in my knowledge), notifies users file by file i.e. as the files come by, they notify the user. But I would like it to notify me a whole bunch of files at a time so that I can pass that list to my function and take use of multi-threading. Currently, when I use "watchdog", it notifies me one file at a time and I am only able to pass that one file to my function. I want to pass it many files at a time to be able to have multithreading.

One solution that comes to my mind is: you see when you copy a bunch of files in a folder, OS shows you a progress bar. If it would be possible for me to be notified when that progress bar is done, then it would be a perfect solution for my question. But I don't know if that is possible.

Also I know that watchdog is a polling API, and an ideal API for watching filesystem would be interrupt driven api like pyinotify. But I didn't find any API which was interrupt driven and also cross platform. iWatch is good, but only for linux, and I want something for all OS. So, if you have suggestions on any other API, please do let me know.

Thanks.

Rash
  • 7,677
  • 1
  • 53
  • 74
  • 1
    Why not spawn a pool of workers which get tasks from a common queue, and then spawn a watchdog thread which puts tasks in the queue? Then, it seems, you would not have to wait for there to be a bunch of files before your workers start working. – unutbu Oct 04 '14 at 17:11
  • Hmm...that actually a cool solution. I should have thought that..thanks man. I will try that. But on a general node, I was wondering, can we get notified when the OS progress bar is finished as I mentioned in paragraph 3 ?? – Rash Oct 04 '14 at 17:21
  • 1
    The progress bar is OS-dependent. When I copy things using `/usr/bin/cp` on Linux, there is no progress bar. So depending on a progress bar would not be a reliable cross-platform solution. – unutbu Oct 04 '14 at 17:25
  • ok that makes sense. I like your solution. But if I create a queue, I would have to keep checking if any new entry has been made to queue or not. So I would still be in an infinite loop to keep checking things. I can obviously think of some workarounds here, but do you know of any good way by which I can trigger an event when something is added to my queue. I am new to python and haven't searched much on this. – Rash Oct 04 '14 at 17:30
  • 2
    The queue's `get` method blocks until there is an item to get. Doug Hellman has written a excellent set of tutorial which should help you get started: [on using Queue](http://pymotw.com/2/Queue/index.html#module-Queue), [the threading module](http://pymotw.com/2/threading/index.html#module-threading), [how to setup and use a pool of worker processes](http://www.doughellmann.com/PyMOTW/multiprocessing/index.html), [how to setup a pool of worker threads](http://stackoverflow.com/q/3033952/190597). – unutbu Oct 04 '14 at 17:45
  • Nice...thanks man. If you wish you can post a summary of our discussion in the answer and I will mark it as correct. That's the least I can do to thank you. :) – Rash Oct 04 '14 at 17:47

1 Answers1

2

Instead of accumulating filesystem events, you could spawn a pool of worker threads which get tasks from a common queue. The watchdog thread could then put tasks in the queue as filesystem events occur. Done this way, a worker thread can start working as soon as an event occurs.

For example,

import logging
import Queue
import threading
import time
import watchdog.observers as observers
import watchdog.events as events

logger = logging.getLogger(__name__)

SENTINEL = None

class MyEventHandler(events.FileSystemEventHandler):
    def on_any_event(self, event):
        super(MyEventHandler, self).on_any_event(event)            
        queue.put(event)
    def __init__(self, queue):
        self.queue = queue

def process(queue):
    while True:
        event = queue.get()
        logger.info(event)


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG,
                        format='[%(asctime)s %(threadName)s] %(message)s',
                        datefmt='%H:%M:%S')

    queue = Queue.Queue()
    num_workers = 4
    pool = [threading.Thread(target=process, args=(queue,)) for i in range(num_workers)]
    for t in pool:
        t.daemon = True
        t.start()

    event_handler = MyEventHandler(queue)
    observer = observers.Observer()
    observer.schedule(
        event_handler,
        path='/tmp/testdir',
        recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

Running

% mkdir /tmp/testdir
% script.py

yields output like

[14:48:31 Thread-1] <FileCreatedEvent: src_path=/tmp/testdir/.#foo>
[14:48:32 Thread-2] <FileModifiedEvent: src_path=/tmp/testdir/foo>
[14:48:32 Thread-3] <FileModifiedEvent: src_path=/tmp/testdir/foo>
[14:48:32 Thread-4] <FileDeletedEvent: src_path=/tmp/testdir/.#foo>
[14:48:42 Thread-1] <FileDeletedEvent: src_path=/tmp/testdir/foo>
[14:48:47 Thread-2] <FileCreatedEvent: src_path=/tmp/testdir/.#bar>
[14:48:49 Thread-4] <FileCreatedEvent: src_path=/tmp/testdir/bar>
[14:48:49 Thread-4] <FileModifiedEvent: src_path=/tmp/testdir/bar>
[14:48:49 Thread-1] <FileDeletedEvent: src_path=/tmp/testdir/.#bar>
[14:48:54 Thread-2] <FileDeletedEvent: src_path=/tmp/testdir/bar>

Doug Hellman has written an excellent set of tutorials (which has now been edited into a book) which should help you get started:

I didn't actually end up using a multiprocessing Pool or ThreadPool as discussed in the last two links, but you may find them useful anyway.

Community
  • 1
  • 1
unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677
  • OMG...its an awesome answer. You also gave me tips and code sample so that I can understand the whole process more clearly. Thank You so much!! – Rash Oct 04 '14 at 19:14