I have a infinite python-watchdog process (daemonized with supervisor
) that trigger new events when files are created within a folder.
On the other side is a asyncio queue, with 3 consumers only, waiting for new messages of created files (consumers then upload the files to a server).
The code below works, but is sequential, and I just don't understand why (despite the many documentation I read about asyncio).
main.py (daemonized):
paths_observer = PathsObserver()
if __name__ == "__main__":
paths_observer.start()
try:
while paths_observer.is_alive():
paths_observer.join(1)
except KeyboardInterrupt:
paths_observer.stop()
paths_observer.join()
pathobserver.py (simplified, used to record which folders to watch):
from watchdog.observers import Observer
class PathsObserver(Observer):
def observe_folder(self, folder_path: str) -> None:
event_handler = DataFileHandler(path=folder_path)
watch = self.schedule(event_handler, folder_path, recursive=True)
filehandler.py: note the import of the scheduler
from watchdog.events import FileCreatedEvent, FileSystemEventHandler
from .scheduler import scheduler
class DataFileHandler(FileSystemEventHandler):
def on_created(self, event):
scheduler.prepare_and_upload(event.src_path)
scheduler.py:
MAX_TASKS = 3
class UploadScheduler(object):
def __init__(self):
self._loop = asyncio.get_event_loop()
self._queue = asyncio.Queue(0, loop=self._loop)
self._consumers = [asyncio.ensure_future(self._consumer(self._queue)) for _ in range(MAX_TASKS)]
async def _consumer(self, queue):
while True:
file_path = await queue.get()
await <do some stuff with file_path>
queue.task_done()
async def _producer(self, file_path):
await self._queue.put(file_path)
def prepare_and_upload(self, file_path):
self._loop.run_until_complete(self._producer(file_path))
scheduler = UploadScheduler()
If I remove self._loop.run_until_complete
, tasks aren't fetch by consumers, and a message says the producer
hasn't been waited. If I put self._queue.put_no_wait(file_path)
in a non-async func, consumer jobs don't get started either.
I am doing something wrong ?
What I expected: when multiple files are created, a bunch a new producers are being called. And since consumers take a bit of time to upload everything, they slowly catch up. And the whole thing stays, and wait for new files to upload.