4

I've played around with threading before in Python, but decided to give the asyncio module a try, especially since you can cancel a running task, which seemed like a nice detail. However, for some reason, I can't wrap my head around it.

Here's what I wanted to implement (sorry if I'm using incorrect terminology):

  • a downloader thread that downloads the same file every x seconds, checks its hash against the previous download and saves it if it's different.
  • a webserver thread that runs in the background, allowing control (pause, list, stop) of the downloader thread.

I used aiohttp for the webserver.

This is what I have so far:

class aiotest():

    def __init__(self):
        self._dl = None     # downloader future
        self._webapp = None # web server future
        self.init_server()

    def init_server(self):

        print('Setting up web interface')
        app = web.Application()
        app.router.add_route('GET', '/stop', self.stop)
        print('added urls')
        self._webapp = app

    @asyncio.coroutine
    def _downloader(self):
        while True:
            try:
                print('Downloading and verifying file...')
                # Dummy sleep - to be replaced by actual code
                yield from asyncio.sleep(random.randint(3,10))
                # Wait a predefined nr of seconds between downloads
                yield from asyncio.sleep(30)
            except asyncio.CancelledError:
                break

    @asyncio.coroutine
    def _supervisor(self):

        print('Starting downloader')
        self._dl = asyncio.async(self._downloader())

    def start(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self._supervisor())
        loop.close()

    @asyncio.coroutine
    def stop(self):
        print('Received STOP')
        self._dl.cancel()
        return web.Response(body=b"Stopping... ")

This class is called by:

t = aiotest()
t.start()

This doesn't work of course, and I feel that this is a horrible piece of code.

What's unclear to me:

  • I stop the downloader in the stop() method, but how would I go about stopping the webserver (e.g. in a shutdown() method)?
  • Does the downloader need a new event loop, or can I use the loop returned by asyncio.get_event_loop()?
  • Do I really need something like the supervisor for what I'm trying to implement? This seems so clunky. And how do I get supervisor to keep running instead of ending after a single execution as it does now?

One last, more general question: is asyncio supposed to replace the threading module (in the future)? Or does each have its own application?

I appreciate all the pointers, remarks and clarifications!

DocZerø
  • 8,037
  • 11
  • 38
  • 66
  • Does your `downloader` block like it would in a traditional thread? Or will it use all asynchronous calls? – Uyghur Lives Matter Mar 19 '16 at 16:24
  • @cpburnz I used `requests` initially (which is blocking if I read correctly), but could also use `aiohttp`. It won't download multiple files (just a single one) and the file itself is relatively small (< 200KB). – DocZerø Mar 19 '16 at 16:29
  • @Kristof have you got any questions about answer? Feel free to ask. – Mikhail Gerasimov Mar 20 '16 at 08:50
  • @germn You really helped me out a lot and I've been doing a fair bit of reading on `asyncio`, slowly starting to get the hang of it. Thanks so much for taking the time to rewrite my code! – DocZerø Mar 20 '16 at 17:15

1 Answers1

4

Why current code is not working:

  • You're running event loop until self._supervisor() is complete. self._supervisor() creates task (it happens immediately) and finishes immediately.

  • You're trying to run event loop until _supervisor complete, but how and when are you going start server? I think event loop should be running until server stopped. _supervisor or other stuff can be added as task (to same event loop). aiohttp already has function to start server and event loop - web.run_app, but we can do it manually.

Your questions:

  1. Your server will run until you stop it. You can start/stop different coroutines while your server working.

  2. You need only one event loop for different coroutines.

  3. I think you don't need supervisor.

  4. More general question: asyncio helps you to run different functions parallel in single thread in single process. That's why asyncio is so cool and fast. Some of your sync code with threads you can rewrite using asyncio and it's coroutines. Moreover: asyncio can interact with threads and processes. It can be useful in case you still need threads and processes: here's example.

Useful notes:

  • It's better to use term coroutine instead of thread while we talk about asyncio coroutines that are not threads
  • If you use Python 3.5, you can use async/await syntax instead of coroutine/yield from

I rewrote your code to show you idea. How to check it: run program, see console, open http://localhost:8080/stop, see console, open http://localhost:8080/start, see console, type CTRL+C.

import asyncio
import random
from contextlib import suppress

from aiohttp import web


class aiotest():
    def __init__(self):
        self._webapp = None
        self._d_task = None
        self.init_server()

    # SERVER:
    def init_server(self):
        app = web.Application()
        app.router.add_route('GET', '/start', self.start)
        app.router.add_route('GET', '/stop', self.stop)
        app.router.add_route('GET', '/kill_server', self.kill_server)
        self._webapp = app

    def run_server(self):
        # Create server:
        loop = asyncio.get_event_loop()
        handler = self._webapp.make_handler()
        f = loop.create_server(handler, '0.0.0.0', 8080)
        srv = loop.run_until_complete(f)
        try:
            # Start downloader at server start:
            asyncio.async(self.start(None))  # I'm using controllers here and below to be short,
                                             # but it's better to split controller and start func
            # Start server:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            # Stop downloader when server stopped:
            loop.run_until_complete(self.stop(None))
            # Cleanup resources:
            srv.close()
            loop.run_until_complete(srv.wait_closed())
            loop.run_until_complete(self._webapp.shutdown())
            loop.run_until_complete(handler.finish_connections(60.0))
            loop.run_until_complete(self._webapp.cleanup())
        loop.close()

    @asyncio.coroutine
    def kill_server(self, request):
        print('Server killing...')
        loop = asyncio.get_event_loop()
        loop.stop()
        return web.Response(body=b"Server killed")

    # DOWNLOADER
    @asyncio.coroutine
    def start(self, request):
        if self._d_task is None:
            print('Downloader starting...')
            self._d_task = asyncio.async(self._downloader())
            return web.Response(body=b"Downloader started")
        else:
            return web.Response(body=b"Downloader already started")

    @asyncio.coroutine
    def stop(self, request):
        if (self._d_task is not None) and (not self._d_task.cancelled()):
            print('Downloader stopping...')
            self._d_task.cancel()            
            # cancel() just say task it should be cancelled
            # to able task handle CancelledError await for it
            with suppress(asyncio.CancelledError):
                yield from self._d_task
            self._d_task = None
            return web.Response(body=b"Downloader stopped")
        else:
            return web.Response(body=b"Downloader already stopped or stopping")

    @asyncio.coroutine
    def _downloader(self):
        while True:
            print('Downloading and verifying file...')
            # Dummy sleep - to be replaced by actual code
            yield from asyncio.sleep(random.randint(1, 2))
            # Wait a predefined nr of seconds between downloads
            yield from asyncio.sleep(1)


if __name__ == '__main__':
    t = aiotest()
    t.run_server()
Community
  • 1
  • 1
Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • if I wanted the shutdown the server (and stop the script completely) through a HTTP call, as if I used `CTRL-C` to interrupted, what would I have to do? Every attempt of mine results in `RuntimeError: Event loop is running.` – DocZerø Mar 21 '16 at 14:57
  • 1
    @Kristof your server is event loop that handles requests as they come. You run your server to work forever, but you can stop it with `KeyboardInterrupt`. If you want to stop it in another coroutine or function, you can also call `loop.close()` - it will do almost same thing as when you type CTRL+C. I fixed example, now you can stop server with `http://localhost:8080/kill_server`. One more note: I understood that it's better to place downloader stopping inside `finally` block (as soon as you're going to stop server different ways). – Mikhail Gerasimov Mar 21 '16 at 23:07