Let's say we have some library (eg. for XML parsing) that accepts a callback and calls it everytime it encounters some event (eg. find some XML tag). I'd like to be able to transform those callbacks into a generator that can be iterated via the for loop. Is that possible in Python without using threads or collecting all the callback results (ie. with lazy evaluation)?
Example:
# this is how I can produce the items
def callback(item)
# do something with each item
parser.parse(xml_file, callback=callback)
# this is how the items should be consumed
for item in iter_parse(xml_file):
print(item)
I've tried to study if coroutines could be used but it seems that coroutines are useful for pushing data from the producer, while generator pull data to the consumer.
The natural idea was that the producer and consumer would be coroutines that would ping the execution flow back and forth.
I've managed to get a producer-consumer pattern working with the asyncio loop (in a similar way to this answer). However it cannot be used like a generator in a for loop:
import asyncio
q = asyncio.Queue(maxsize=1)
@asyncio.coroutine
def produce(data):
for v in data:
print("Producing:", v)
yield from q.put(v)
print("Producer waiting")
yield from q.put(None)
print("Producer done")
@asyncio.coroutine
def consume():
while True:
print("Consumer waiting")
value = yield from q.get()
print("Consumed:", value)
if value is not None:
# process the value
yield from asyncio.sleep(0.5)
else:
break
print("Consumer done")
tasks = [
asyncio.Task(consume()),
asyncio.Task(produce(data=range(5)))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
The problem is that the result cannot be iterated in a for loop since it is managed by the loop.
When I rewrite the code so that the callback is called from an ordinary function, the problem is that asyncio.Queue.put() called from the callback doesn't block and the computation is not lazy.
import asyncio
q = asyncio.Queue(maxsize=1)
def parse(data, callback):
for value in data:
# yield from q.put(value)
callback(value)
@asyncio.coroutine
def produce(data):
@asyncio.coroutine
def enqueue(value):
print('enqueue()', value)
yield from q.put(value)
def callback(value):
print('callback()', value)
asyncio.async(enqueue(value))
parse(data, callback)
print('produce()')
print('produce(): enqueuing sentinel value')
asyncio.async(enqueue(None))
print('produce(): done')
@asyncio.coroutine
def consume():
print('consume()')
while True:
print('consume(): waiting')
value = yield from q.get()
print('consumed:', value)
if value is not None:
# here we'd like to yield and use this in a for loop elsewhere
print(value)
else:
break
print('consume(): done')
tasks = [
asyncio.Task(consume()),
asyncio.Task(produce(range(5)))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# I'd like:
# for value in iter_parse(data=range(5)):
# print('consumed:', value)
It this kind of computation even possible with asyncio or do I need to use greenlet or gevent? I seems in gevent it is possible to iterate over async results in for loop but I don't like to depend on another library if possible and it is not completely ready for Python 3.