0

I have a infinite python-watchdog process (daemonized with supervisor) that trigger new events when files are created within a folder.

On the other side is a asyncio queue, with 3 consumers only, waiting for new messages of created files (consumers then upload the files to a server).

The code below works, but is sequential, and I just don't understand why (despite the many documentation I read about asyncio).

main.py (daemonized):

paths_observer = PathsObserver()

if __name__ == "__main__":
    paths_observer.start()

    try:
        while paths_observer.is_alive():
            paths_observer.join(1)
    except KeyboardInterrupt:
        paths_observer.stop()

    paths_observer.join()

pathobserver.py (simplified, used to record which folders to watch):

from watchdog.observers import Observer

class PathsObserver(Observer):
    def observe_folder(self, folder_path: str) -> None:
        event_handler = DataFileHandler(path=folder_path)
        watch = self.schedule(event_handler, folder_path, recursive=True)

filehandler.py: note the import of the scheduler

from watchdog.events import FileCreatedEvent, FileSystemEventHandler

from .scheduler import scheduler

class DataFileHandler(FileSystemEventHandler):
    def on_created(self, event):        
        scheduler.prepare_and_upload(event.src_path)

scheduler.py:

MAX_TASKS = 3

class UploadScheduler(object):
    def __init__(self):
        self._loop = asyncio.get_event_loop()
        self._queue = asyncio.Queue(0, loop=self._loop)
        self._consumers = [asyncio.ensure_future(self._consumer(self._queue)) for _ in range(MAX_TASKS)]

    async def _consumer(self, queue):
        while True:
            file_path = await queue.get()
            await <do some stuff with file_path>
            queue.task_done()

    async def _producer(self, file_path):
        await self._queue.put(file_path)

    def prepare_and_upload(self, file_path):
        self._loop.run_until_complete(self._producer(file_path))


scheduler = UploadScheduler()

If I remove self._loop.run_until_complete, tasks aren't fetch by consumers, and a message says the producer hasn't been waited. If I put self._queue.put_no_wait(file_path) in a non-async func, consumer jobs don't get started either.

I am doing something wrong ?

What I expected: when multiple files are created, a bunch a new producers are being called. And since consumers take a bit of time to upload everything, they slowly catch up. And the whole thing stays, and wait for new files to upload.

onekiloparsec
  • 2,013
  • 21
  • 32
  • How exactly do you upload to the server? Do you use aiohttp or other asyncio-compatible library? – user4815162342 Aug 23 '20 at 08:39
  • Also, where do you run the event loop? I only see `run_until_complete` which just puts the item into the queue. The event loop should run all the time, in order to service the consumers as well. – user4815162342 Aug 23 '20 at 08:40
  • Very good questions. As for the event loop, I tried with `run_forever` but then got stuck (nothing seems to be executed then). The uploads are performed by another library of mine (which uses `requests` with dedicated threads). – onekiloparsec Aug 23 '20 at 12:17
  • Does it matter that the upload library isn't using asyncio? – onekiloparsec Aug 23 '20 at 12:18
  • The event loop must run for the entire program, that's how asyncio is designed. If you're not running the event loop, none of the asyncio stuff is running either, it's as simple as that. `run_forever()` is useful when you set up the loop beforehand and just let it do its thing, but these days it's somewhat deprecated (along with `run_until_complete`) in favor of `asyncio.run()`. If `run_forever` didn't work for you, maybe you had other problems. – user4815162342 Aug 23 '20 at 20:02
  • And yes, it very much matters for the upload library to be written with async in mind because otherwise it won't parallelize at all. Using requests almost certainly explains why your code runs sequentially. Async-compatible libraries do all their work in callbacks and coroutines, suspending to the event loop on every IO operation, and *that* allows running any number of them in parallel, all inside a single thread. See e.g. [this answer](https://stackoverflow.com/a/51177895/1600898) or [this lecture](https://youtu.be/MCs5OvhV9S4) for a more detailed explanation of async/await in Python. – user4815162342 Aug 23 '20 at 20:05
  • Thanks a lot for these insights. I suspect I may have design problem in fact, since I don't understand exactly what I am doing. My upload library is creating a Thread to upload, but it doesn't use asyncio. I should maybe reconsider the whole thing. – onekiloparsec Aug 23 '20 at 20:23
  • If by any chance you want to see by yourself, these are two open-source libs: https://github.com/arcsecond-io/oort (for the lib using async io), and https://github.com/arcsecond-io/cli (for the "upload lib"). – onekiloparsec Aug 23 '20 at 20:25
  • You almost certainly don't want to mix threads and asyncio, especially if you're not sure what you're doing. But it should be really easy to switch from requests to [aiohttp](https://docs.aiohttp.org/en/stable/) (which has an API intentionally similar to that of requests), drop the thread creation, and your upload code should automatically become async. – user4815162342 Aug 23 '20 at 20:36

0 Answers0