4

I have a simple aiohttp-server with two handlers. First one does some computations in the async for loop. Second one just returns text response. not_so_long_operation returns 30-th fibonacci number with the slowest recursive implementation, which takes something about one second.

def not_so_long_operation():
    return fib(30)

class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            return i
        else:
            raise StopAsyncIteration

# GET /
async def index(request):
    print('request!')
    l = []
    async for i in arange(20):
        print(i)
        l.append(not_so_long_operation())

    return aiohttp.web.Response(text='%d\n' % l[0])

# GET /lol/
async def lol(request):
    print('request!')
    return aiohttp.web.Response(text='just respond\n')

When I'm trying to fetch / and then /lol/, it gives me response for the second one only when the first one gets finished.
What am I doing wrong and how to make index handler release the ioloop on each iteration?

3 Answers3

4

Your example has no yield points (await statements) for switching between tasks. Asynchronous iterator allows to use await inside __aiter__/__anext__ but don't insert it automatically into your code.

Say,

class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            await asyncio.sleep(0)  # insert yield point
            return i
        else:
            raise StopAsyncIteration

should work as you expected.

In real application most likely you don't need await asyncio.sleep(0) calls because you will wait on database access and similar activities.

Andrew Svetlov
  • 16,730
  • 8
  • 66
  • 69
  • 1
    So, as far as I understood, according to [PEP-492](https://www.python.org/dev/peps/pep-0492/#why-stopasynciteration), `async for` was added to avoid collisions with `StopIteration`, used in coroutines internally? – Michael Ihnatenko Nov 05 '15 at 12:51
  • 1
    Not exactly. `async for` uses `__aiter__`/`__anext__` pair of coroutines. Let's assume we read bulk of data from DB (like [redis iscan command](https://github.com/aio-libs/aioredis/blob/master/aioredis/commands/generic.py#L210-L220) ). It fetches a bulk of data, returns it item by item and fetch next bulk on data exhausting. You may perform I/O in `__aiter__` coroutine with help of `await` statement but cannot do it in old-good `__iter__` method. – Andrew Svetlov Nov 05 '15 at 13:29
3

Since, fib(30) is CPU bound and sharing little data, you should probably use a ProcessPoolExecutor (as opposed to a ThreadPoolExecutor):

async def index(request):
    loop = request.app.loop
    executor = request.app["executor"]
    result = await loop.run_in_executor(executor, fib, 30)
    return web.Response(text="%d" % result)

Setup executor when you create the app:

app = Application(...)
app["exector"] = ProcessPoolExector()
Jashandeep Sohi
  • 4,903
  • 2
  • 23
  • 25
  • `fib(30)` is just an example of operation, which runs not so long once, but long enough while running in the loop. CPU-bound operation was not the best idea, but thank you for your snippet! – Michael Ihnatenko Nov 05 '15 at 13:34
2

An asynchronous iterator is not really needed here. Instead you can simply give the control back to the event loop inside your loop. In python 3.4, this is done by using a simple yield:

@asyncio.coroutine
def index(self):
    for i in range(20):
        not_so_long_operation()
        yield

In python 3.5, you can define an Empty object that basically does the same thing:

class Empty:
    def __await__(self):
        yield

Then use it with the await syntax:

async def index(request):
    for i in range(20):
        not_so_long_operation()
        await Empty()

Or simply use asyncio.sleep(0) that has been recently optimized:

async def index(request):
    for i in range(20):
        not_so_long_operation()
        await asyncio.sleep(0)

You could also run the not_so_long_operation in a thread using the default executor:

async def index(request, loop):
    for i in range(20):
        await loop.run_in_executor(None, not_so_long_operation)
Vincent
  • 12,919
  • 1
  • 42
  • 64
  • is it necessary to construct Empty object each time? Is it ok to create `next_tick = Empty()` and then use it with `await next_tick`? – Michael Ihnatenko Nov 04 '15 at 15:16
  • @MichaelIhnatenko Sure it's fine, though I'm surprised there's no standard way to give the control back to the event loop in python 3.5. – Vincent Nov 04 '15 at 15:41