Generator as coroutine (no threading)
Let's have FakeFtp
with retrbinary
function using callback being called with each successful read of chunk of data:
class FakeFtp(object):
def __init__(self):
self.data = iter(["aaa", "bbb", "ccc", "ddd"])
def login(self, user, password):
self.user = user
self.password = password
def retrbinary(self, cmd, cb):
for chunk in self.data:
cb(chunk)
Using simple callback function has disadvantage, that it is called repeatedly and the callback
function cannot easily keep context between calls.
Following code defines process_chunks
generator, which will be able receiving chunks of data one
by one and processing them. In contrast to simple callback, here we are able to keep all the
processing within one function without losing context.
from contextlib import closing
from itertools import count
def main():
processed = []
def process_chunks():
for i in count():
try:
# (repeatedly) get the chunk to process
chunk = yield
except GeneratorExit:
# finish_up
print("Finishing up.")
return
else:
# Here process the chunk as you like
print("inside coroutine, processing chunk:", i, chunk)
product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
processed.append(product)
with closing(process_chunks()) as coroutine:
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`
print("processed result", processed)
print("DONE")
To see the code in action, put the FakeFtp
class, the code shown above and following line:
main()
into one file and call it:
$ python headsandtails.py
('inside coroutine, processing chunk:', 0, 'aaa')
('inside coroutine, processing chunk:', 1, 'bbb')
('inside coroutine, processing chunk:', 2, 'ccc')
('inside coroutine, processing chunk:', 3, 'ddd')
Finishing up.
('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
DONE
How it works
processed = []
is here just to show, the generator process_chunks
shall have no problems to
cooperate with its external context. All is wrapped into def main():
to prove, there is no need to
use global variables.
def process_chunks()
is the core of the solution. It might have one shot input parameters (not
used here), but main point, where it receives input is each yield
line returning what anyone sends
via .send(data)
into instance of this generator. One can coroutine.send(chunk)
but in this example it is done via callback refering to this function callback.send
.
Note, that in real solution there is no problem to have multiple yield
s in the code, they are
processed one by one. This might be used e.g. to read (and ignore) header of CSV file and then
continue processing records with data.
We could instantiate and use the generator as follows:
coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`
# close the coroutine (will throw the `GeneratorExit` exception into the
# `process_chunks` coroutine).
coroutine.close()
Real code is using contextlib
closing
context manager to ensure, the coroutine.close()
is
always called.
Conclusions
This solution is not providing sort of iterator to consume data from in traditional style "from
outside". On the other hand, we are able to:
- use the generator "from inside"
- keep all iterative processing within one function without being interrupted between callbacks
- optionally use external context
- provide usable results to outside
- all this can be done without using threading
Credits: The solution is heavily inspired by SO answer Python FTP “chunk” iterator (without loading entire file into memory)
written by user2357112