9

I'm trying to write my own awaiatbale function which could use in asyncio loop such as asyncio.sleep() method or something like these pre-awaitable implemented methods.

Here is what I've done so far:

import asyncio

def coro1():
    for i in range(1, 10):
        yield i

def coro2():
    for i in range(1, 10):
        yield i*10

class Coro:  # Not used.
    def __await__(self):
        for i in range(1, 10):
            yield i * 100

@asyncio.coroutine
def wrapper1():
    return (yield from coro1())

@asyncio.coroutine
def wrapper2():
    return (yield from coro2())

for i in wrapper1():
    print(i)

print("Above result was obvious which I can iterate around a couroutine.".center(80, "#"))

async def async_wrapper():
    await wrapper1()
    await wrapper2()

loop = asyncio.get_event_loop()
futures = [asyncio.ensure_future(async_wrapper())]
result = loop.run_until_complete(asyncio.gather(*futures))
print(result)

loop.close()

What I got as a result:

1
2
3
4
5
6
7
8
9
#######Above result was obvious which I can iterate around a couroutine.#########
Traceback (most recent call last):
  File "stack-coroutine.py", line 36, in <module>
    result = loop.run_until_complete(asyncio.gather(*futures))
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "stack-coroutine.py", line 30, in async_wrapper
    await wrapper1()
  File "stack-coroutine.py", line 18, in wrapper1
    return (yield from coro1())
  File "stack-coroutine.py", line 5, in coro1
    yield i
RuntimeError: Task got bad yield: 1

What I expect as a result:

1
10
2
20
3
30
.
.
.

[NOTE]:

  • I'm not looking for a multithread or multiprocess method.
  • This Question is almost similar to my question which has not resolved yet.
  • I'm using Python3.6
Benyamin Jafari
  • 27,880
  • 26
  • 135
  • 150
  • 1
    Can you show us **working** asyncio code a part of which you'd like to reimplement manually? Your `async_wrapper` runs the two coroutines in sequence, not in parallel. – user4815162342 Oct 07 '19 at 13:22
  • Also, asyncio doesn't use generators to produce values to calling coroutines, it uses them to request suspension to the event loop. The yielded value is only visible to the event loop, not to intermediate coroutines, and in asyncio it contains a future object. – user4815162342 Oct 07 '19 at 13:29
  • @user4815162342 Thanks for the response. I'm looking for a manner to implement an awaitable function like many pre-implemented awaitable methods (i.e. `asyncio.sleep()`). This code snippet is my effort to reach that. I would reach to mentioned result in my question with my own awaitable function. – Benyamin Jafari Oct 07 '19 at 13:35
  • 2
    You can take a look at the asyncio source code to see how basic coroutines like `asyncio.sleep()` are implemented. I can also recommend [this lecture](https://www.youtube.com/watch?v=MCs5OvhV9S4) that shows how the event loop works by building one from scratch, live. The code in the question shows misconceptions about how async/await works in Python, so it the question doesn't appear answerable at this point, at least not within the format of a normal StackOverflow answer. – user4815162342 Oct 07 '19 at 14:18

2 Answers2

3

I found a concurrency/asynchronous approach using generators. However, it's not an asyncio approach:

from collections import deque

def coro1():
    for i in range(1, 5):
        yield i

def coro2():
    for i in range(1, 5):
        yield i*10

print('Async behaviour using default list with O(n)'.center(60, '#'))
tasks = list()
tasks.extend([coro1(), coro2()])

while tasks:
    task = tasks.pop(0)
    try:
        print(next(task))
        tasks.append(task)
    except StopIteration:
        pass

print('Async behaviour using deque with O(1)'.center(60, '#'))
tasks = deque()
tasks.extend([coro1(), coro2()])

while tasks:
    task = tasks.popleft()  # select and remove a task (coro1/coro2).
    try:
        print(next(task))
        tasks.append(task)  # add the removed task (coro1/coro2) for permutation.
    except StopIteration:
        pass

Out:

########Async behaviour using default list with O(n)########
1
10
2
20
3
30
4
40
###########Async behaviour using deque with O(1)############
1
10
2
20
3
30
4
40

[UPDATE]:

Finally, I've solved this example through asyncio syntax:

import asyncio

async def coro1():
    for i in range(1, 6):
        print(i)
        await asyncio.sleep(0)  # switches task every one iteration.

async def coro2():
    for i in range(1, 6):
        print(i * 10)
        await asyncio.sleep(0)  # switches task every one iteration.

loop = asyncio.get_event_loop()
futures = [
    asyncio.ensure_future(coro1()),
    asyncio.ensure_future(coro2())
]
loop.run_until_complete(asyncio.gather(*futures))
loop.close()

Out:

1
10
2
20
3
30
4
40
5
50

And another concurrency coroutine approach via async-await expression and an event-loop manager based on Heap queue algorithm, without using asyncio library and its event-loop as well as without using asyncio.sleep() method:

import heapq
from time import sleep
from datetime import datetime, timedelta

class Sleep:
    def __init__(self, seconds):
        self.sleep_until = datetime.now() + timedelta(seconds=seconds)

    def __await__(self):
        yield self.sleep_until

async def coro1():
    for i in range(1, 6):
        await Sleep(0)
        print(i)

async def coro2():
    for i in range(1, 6):
        await Sleep(0)
        print(i * 10)

def coro_manager(*coros):
    coros = [(datetime.now(), coro) for coro in coros]
    heapq.heapify(coros)
    while coros:
        exec_at, coro = heapq.heappop(coros)
        if exec_at > datetime.now():
            sleep((exec_at - datetime.now()).total_seconds())
        try:
            heapq.heappush(coros, (coro.send(None), coro))
        except StopIteration:
            try:
                coros.remove(coro)
            except ValueError:
                pass

coro_manager(coro1(), coro2())

Out:

1
10
2
20
3
30
4
40
5
50
Benyamin Jafari
  • 27,880
  • 26
  • 135
  • 150
1

Usually you don't need to write low-level coroutines, using async def and awaiting inside it is a common way to achieve your goal.

However if you interested in implementation details here's source code of asyncio.sleep().

Similar to many other low-level asyncio functions it uses 3 main things to implement coroutine:

  • asyncio.Future() - "a bridge" between callbacks-world and coroutines-world
  • event loop's loop.call_later() method - on of several event loop's methods that tells directly to event loop when to do something
  • async def and await - just a syntax sugar for @asyncio.coroutine and yield from that allows to cast some function to generator (and execute it "one step at the time")

Here's my rough implementation of sleep that shows the idea:

import asyncio


# @asyncio.coroutine - for better tracebacks and edge cases, we can avoid it here
def my_sleep(delay):
    fut = asyncio.Future()

    loop = asyncio.get_event_loop()
    loop.call_later(
        delay,
        lambda *_: fut.set_result(True)
    )

    res = yield from fut
    return res


# Test:
@asyncio.coroutine
def main():
    yield from my_sleep(3)
    print('ok')


asyncio.run(main())

If you want to go lower than this you'll have to comprehend how generators (or coroutines) being managed by event loop. Video mentioned by user4815162342 - is a good place to start.

But again, all above - are details of implementation. You don't have to think about all this stuff unless you write something very-very low-level.

Community
  • 1
  • 1
Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • Thanks for the response. I see many libraries add the async version of their code by `asyncio` (i.e. `pymodbus`, `pysnmp`, etc). So I decided to write my own async method like them. I knew that the main async/await functionality of a method is suspending to reach another task at this time, thus I thought I should use the `yield` for suspension and I wouldn't eager to write a low-level manner or reimplement the implemented things. – Benyamin Jafari Oct 08 '19 at 05:06
  • @BenyaminJafari if you want to write async version for some library, easiest way is to run blocking code in executor with [loop.run_in_executor()](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) and await for the result. Second way is to use sync library's callbacks to set result for `asyncio.Future` that can be awaited in async code. But it's more complicated and not every sync library provides callbacks. It should suffice, there's no need to go into more low-level details. – Mikhail Gerasimov Oct 08 '19 at 16:08