Talking about async/await
and asyncio
is not the same thing. The first is a fundamental, low-level construct (coroutines) while the later is a library using these constructs. Conversely, there is no single ultimate answer.
The following is a general description of how async/await
and asyncio
-like libraries work. That is, there may be other tricks on top (there are...) but they are inconsequential unless you build them yourself. The difference should be negligible unless you already know enough to not have to ask such a question.
1. Coroutines versus subroutines in a nut shell
Just like subroutines (functions, procedures, ...), coroutines (generators, ...) are an abstraction of call stack and instruction pointer: there is a stack of executing code pieces, and each is at a specific instruction.
The distinction of def
versus async def
is merely for clarity. The actual difference is return
versus yield
. From this, await
or yield from
take the difference from individual calls to entire stacks.
1.1. Subroutines
A subroutine represents a new stack level to hold local variables, and a single traversal of its instructions to reach an end. Consider a subroutine like this:
def subfoo(bar):
qux = 3
return qux * bar
When you run it, that means
- allocate stack space for
bar
and qux
- recursively execute the first statement and jump to the next statement
- once at a
return
, push its value to the calling stack
- clear the stack (1.) and instruction pointer (2.)
Notably, 4. means that a subroutine always starts at the same state. Everything exclusive to the function itself is lost upon completion. A function cannot be resumed, even if there are instructions after return
.
root -\
: \- subfoo --\
:/--<---return --/
|
V
1.2. Coroutines as persistent subroutines
A coroutine is like a subroutine, but can exit without destroying its state. Consider a coroutine like this:
def cofoo(bar):
qux = yield bar # yield marks a break point
return qux
When you run it, that means
- allocate stack space for
bar
and qux
- recursively execute the first statement and jump to the next statement
- once at a
yield
, push its value to the calling stack but store the stack and instruction pointer
- once calling into
yield
, restore stack and instruction pointer and push arguments to qux
- once at a
return
, push its value to the calling stack
- clear the stack (1.) and instruction pointer (2.)
Note the addition of 2.1 and 2.2 - a coroutine can be suspended and resumed at predefined points. This is similar to how a subroutine is suspended during calling another subroutine. The difference is that the active coroutine is not strictly bound to its calling stack. Instead, a suspended coroutine is part of a separate, isolated stack.
root -\
: \- cofoo --\
:/--<+--yield --/
| :
V :
This means that suspended coroutines can be freely stored or moved between stacks. Any call stack that has access to a coroutine can decide to resume it.
1.3. Traversing the call stack
So far, our coroutine only goes down the call stack with yield
. A subroutine can go down and up the call stack with return
and ()
. For completeness, coroutines also need a mechanism to go up the call stack. Consider a coroutine like this:
def wrap():
yield 'before'
yield from cofoo()
yield 'after'
When you run it, that means it still allocates the stack and instruction pointer like a subroutine. When it suspends, that still is like storing a subroutine.
However, yield from
does both. It suspends stack and instruction pointer of wrap
and runs cofoo
. Note that wrap
stays suspended until cofoo
finishes completely. Whenever cofoo
suspends or something is sent, cofoo
is directly connected to the calling stack.
1.4. Coroutines all the way down
As established, yield from
allows to connect two scopes across another intermediate one. When applied recursively, that means the top of the stack can be connected to the bottom of the stack.
root -\
: \-> coro_a -yield-from-> coro_b --\
:/ <-+------------------------yield ---/
| :
:\ --+-- coro_a.send----------yield ---\
: coro_b <-/
Note that root
and coro_b
do not know about each other. This makes coroutines much cleaner than callbacks: coroutines still built on a 1:1 relation like subroutines. Coroutines suspend and resume their entire existing execution stack up until a regular call point.
Notably, root
could have an arbitrary number of coroutines to resume. Yet, it can never resume more than one at the same time. Coroutines of the same root are concurrent but not parallel!
1.5. Python's async
and await
The explanation has so far explicitly used the yield
and yield from
vocabulary of generators - the underlying functionality is the same. The new Python3.5 syntax async
and await
exists mainly for clarity.
def foo(): # subroutine?
return None
def foo(): # coroutine?
yield from foofoo() # generator? coroutine?
async def foo(): # coroutine!
await foofoo() # coroutine!
return None
The async for
and async with
statements are needed because you would break the yield from/await
chain with the bare for
and with
statements.
2. Anatomy of a simple event loop
By itself, a coroutine has no concept of yielding control to another coroutine. It can only yield control to the caller at the bottom of a coroutine stack. This caller can then switch to another coroutine and run it.
This root node of several coroutines is commonly an event loop: on suspension, a coroutine yields an event on which it wants resume. In turn, the event loop is capable of efficiently waiting for these events to occur. This allows it to decide which coroutine to run next, or how to wait before resuming.
Such a design implies that there is a set of pre-defined events that the loop understands. Several coroutines await
each other, until finally an event is await
ed. This event can communicate directly with the event loop by yield
ing control.
loop -\
: \-> coroutine --await--> event --\
:/ <-+----------------------- yield --/
| :
| : # loop waits for event to happen
| :
:\ --+-- send(reply) -------- yield --\
: coroutine <--yield-- event <-/
The key is that coroutine suspension allows the event loop and events to directly communicate. The intermediate coroutine stack does not require any knowledge about which loop is running it, nor how events work.
2.1.1. Events in time
The simplest event to handle is reaching a point in time. This is a fundamental block of threaded code as well: a thread repeatedly sleep
s until a condition is true.
However, a regular sleep
blocks execution by itself - we want other coroutines to not be blocked. Instead, we want tell the event loop when it should resume the current coroutine stack.
2.1.2. Defining an Event
An event is simply a value we can identify - be it via an enum, a type or other identity. We can define this with a simple class that stores our target time. In addition to storing the event information, we can allow to await
a class directly.
class AsyncSleep:
"""Event to sleep until a point in time"""
def __init__(self, until: float):
self.until = until
# used whenever someone ``await``s an instance of this Event
def __await__(self):
# yield this Event to the loop
yield self
def __repr__(self):
return '%s(until=%.1f)' % (self.__class__.__name__, self.until)
This class only stores the event - it does not say how to actually handle it.
The only special feature is __await__
- it is what the await
keyword looks for. Practically, it is an iterator but not available for the regular iteration machinery.
2.2.1. Awaiting an event
Now that we have an event, how do coroutines react to it? We should be able to express the equivalent of sleep
by await
ing our event. To better see what is going on, we wait twice for half the time:
import time
async def asleep(duration: float):
"""await that ``duration`` seconds pass"""
await AsyncSleep(time.time() + duration / 2)
await AsyncSleep(time.time() + duration / 2)
We can directly instantiate and run this coroutine. Similar to a generator, using coroutine.send
runs the coroutine until it yield
s a result.
coroutine = asleep(100)
while True:
print(coroutine.send(None))
time.sleep(0.1)
This gives us two AsyncSleep
events and then a StopIteration
when the coroutine is done. Notice that the only delay is from time.sleep
in the loop! Each AsyncSleep
only stores an offset from the current time.
2.2.2. Event + Sleep
At this point, we have two separate mechanisms at our disposal:
AsyncSleep
Events that can be yielded from inside a coroutine
time.sleep
that can wait without impacting coroutines
Notably, these two are orthogonal: neither one affects or triggers the other. As a result, we can come up with our own strategy to sleep
to meet the delay of an AsyncSleep
.
2.3. A naive event loop
If we have several coroutines, each can tell us when it wants to be woken up. We can then wait until the first of them wants to be resumed, then for the one after, and so on. Notably, at each point we only care about which one is next.
This makes for a straightforward scheduling:
- sort coroutines by their desired wake up time
- pick the first that wants to wake up
- wait until this point in time
- run this coroutine
- repeat from 1.
A trivial implementation does not need any advanced concepts. A list
allows to sort coroutines by date. Waiting is a regular time.sleep
. Running coroutines works just like before with coroutine.send
.
def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
# store wake-up-time and coroutines
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting:
# 2. pick the first coroutine that wants to wake up
until, coroutine = waiting.pop(0)
# 3. wait until this point in time
time.sleep(max(0.0, until - time.time()))
# 4. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
Of course, this has ample room for improvement. We can use a heap for the wait queue or a dispatch table for events. We could also fetch return values from the StopIteration
and assign them to the coroutine. However, the fundamental principle remains the same.
2.4. Cooperative Waiting
The AsyncSleep
event and run
event loop are a fully working implementation of timed events.
async def sleepy(identifier: str = "coroutine", count=5):
for i in range(count):
print(identifier, 'step', i + 1, 'at %.2f' % time.time())
await asleep(0.1)
run(*(sleepy("coroutine %d" % j) for j in range(5)))
This cooperatively switches between each of the five coroutines, suspending each for 0.1 seconds. Even though the event loop is synchronous, it still executes the work in 0.5 seconds instead of 2.5 seconds. Each coroutine holds state and acts independently.
3. I/O event loop
An event loop that supports sleep
is suitable for polling. However, waiting for I/O on a file handle can be done more efficiently: the operating system implements I/O and thus knows which handles are ready. Ideally, an event loop should support an explicit "ready for I/O" event.
3.1. The select
call
Python already has an interface to query the OS for read I/O handles. When called with handles to read or write, it returns the handles ready to read or write:
readable, writable, _ = select.select(rlist, wlist, xlist, timeout)
For example, we can open
a file for writing and wait for it to be ready:
write_target = open('/tmp/foo')
readable, writable, _ = select.select([], [write_target], [])
Once select returns, writable
contains our open file.
3.2. Basic I/O event
Similar to the AsyncSleep
request, we need to define an event for I/O. With the underlying select
logic, the event must refer to a readable object - say an open
file. In addition, we store how much data to read.
class AsyncRead:
def __init__(self, file, amount=1):
self.file = file
self.amount = amount
self._buffer = b'' if 'b' in file.mode else ''
def __await__(self):
while len(self._buffer) < self.amount:
yield self
# we only get here if ``read`` should not block
self._buffer += self.file.read(1)
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.file, self.amount, len(self._buffer)
)
As with AsyncSleep
we mostly just store the data required for the underlying system call. This time, __await__
is capable of being resumed multiple times - until our desired amount
has been read. In addition, we return
the I/O result instead of just resuming.
3.3. Augmenting an event loop with read I/O
The basis for our event loop is still the run
defined previously. First, we need to track the read requests. This is no longer a sorted schedule, we only map read requests to coroutines.
# new
waiting_read = {} # type: Dict[file, coroutine]
Since select.select
takes a timeout parameter, we can use it in place of time.sleep
.
# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(waiting_read), [], [])
This gives us all readable files - if there are any, we run the corresponding coroutine. If there are none, we have waited long enough for our current coroutine to run.
# new - reschedule waiting coroutine, run readable coroutine
if readable:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read[readable[0]]
Finally, we have to actually listen for read requests.
# new
if isinstance(command, AsyncSleep):
...
elif isinstance(command, AsyncRead):
...
3.4. Putting it together
The above was a bit of a simplification. We need to do some switching to not starve sleeping coroutines if we can always read. We need to handle having nothing to read or nothing to wait for. However, the end result still fits into 30 LOC.
def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
waiting_read = {} # type: Dict[file, coroutine]
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting or waiting_read:
# 2. wait until the next coroutine may run or read ...
try:
until, coroutine = waiting.pop(0)
except IndexError:
until, coroutine = float('inf'), None
readable, _, _ = select.select(list(waiting_read), [], [])
else:
readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
# ... and select the appropriate one
if readable and time.time() < until:
if until and coroutine:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read.pop(readable[0])
# 3. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension ...
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
# ... or register reads
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
3.5. Cooperative I/O
The AsyncSleep
, AsyncRead
and run
implementations are now fully functional to sleep and/or read.
Same as for sleepy
, we can define a helper to test reading:
async def ready(path, amount=1024*32):
print('read', path, 'at', '%d' % time.time())
with open(path, 'rb') as file:
result = await AsyncRead(file, amount)
print('done', path, 'at', '%d' % time.time())
print('got', len(result), 'B')
run(sleepy('background', 5), ready('/dev/urandom'))
Running this, we can see that our I/O is interleaved with the waiting task:
id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B
4. Non-Blocking I/O
While I/O on files gets the concept across, it is not really suitable for a library like asyncio
: the select
call always returns for files, and both open
and read
may block indefinitely. This blocks all coroutines of an event loop - which is bad. Libraries like aiofiles
use threads and synchronization to fake non-blocking I/O and events on file.
However, sockets do allow for non-blocking I/O - and their inherent latency makes it much more critical. When used in an event loop, waiting for data and retrying can be wrapped without blocking anything.
4.1. Non-Blocking I/O event
Similar to our AsyncRead
, we can define a suspend-and-read event for sockets. Instead of taking a file, we take a socket - which must be non-blocking. Also, our __await__
uses socket.recv
instead of file.read
.
class AsyncRecv:
def __init__(self, connection, amount=1, read_buffer=1024):
assert not connection.getblocking(), 'connection must be non-blocking for async recv'
self.connection = connection
self.amount = amount
self.read_buffer = read_buffer
self._buffer = b''
def __await__(self):
while len(self._buffer) < self.amount:
try:
self._buffer += self.connection.recv(self.read_buffer)
except BlockingIOError:
yield self
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.connection, self.amount, len(self._buffer)
)
In contrast to AsyncRead
, __await__
performs truly non-blocking I/O. When data is available, it always reads. When no data is available, it always suspends. That means the event loop is only blocked while we perform useful work.
4.2. Un-Blocking the event loop
As far as the event loop is concerned, nothing changes much. The event to listen for is still the same as for files - a file descriptor marked ready by select
.
# old
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
waiting_read[command.connection] = coroutine
At this point, it should be obvious that AsyncRead
and AsyncRecv
are the same kind of event. We could easily refactor them to be one event with an exchangeable I/O component. In effect, the event loop, coroutines and events cleanly separate a scheduler, arbitrary intermediate code and the actual I/O.
4.3. The ugly side of non-blocking I/O
In principle, what you should do at this point is replicate the logic of read
as a recv
for AsyncRecv
. However, this is much more ugly now - you have to handle early returns when functions block inside the kernel, but yield control to you. For example, opening a connection versus opening a file is much longer:
# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
connection.connect((url, port))
except BlockingIOError:
pass
Long story short, what remains is a few dozen lines of Exception handling. The events and event loop already work at this point.
id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5
Addendum
Example code at github