6

Let's say we have some library (eg. for XML parsing) that accepts a callback and calls it everytime it encounters some event (eg. find some XML tag). I'd like to be able to transform those callbacks into a generator that can be iterated via the for loop. Is that possible in Python without using threads or collecting all the callback results (ie. with lazy evaluation)?

Example:

# this is how I can produce the items
def callback(item)
    # do something with each item
parser.parse(xml_file, callback=callback)

# this is how the items should be consumed
for item in iter_parse(xml_file):
    print(item)

I've tried to study if coroutines could be used but it seems that coroutines are useful for pushing data from the producer, while generator pull data to the consumer.

The natural idea was that the producer and consumer would be coroutines that would ping the execution flow back and forth.

I've managed to get a producer-consumer pattern working with the asyncio loop (in a similar way to this answer). However it cannot be used like a generator in a for loop:

import asyncio

q = asyncio.Queue(maxsize=1)

@asyncio.coroutine
def produce(data):
    for v in data:
        print("Producing:", v)
        yield from q.put(v)
        print("Producer waiting")
    yield from q.put(None)
    print("Producer done")

@asyncio.coroutine
def consume():
    while True:
        print("Consumer waiting")
        value = yield from q.get()
        print("Consumed:", value)
        if value is not None:
            # process the value
            yield from asyncio.sleep(0.5)
        else:
            break
    print("Consumer done")

tasks = [
    asyncio.Task(consume()),
    asyncio.Task(produce(data=range(5)))
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

The problem is that the result cannot be iterated in a for loop since it is managed by the loop.

When I rewrite the code so that the callback is called from an ordinary function, the problem is that asyncio.Queue.put() called from the callback doesn't block and the computation is not lazy.

import asyncio

q = asyncio.Queue(maxsize=1)

def parse(data, callback):
    for value in data:
        # yield from q.put(value)
        callback(value)

@asyncio.coroutine
def produce(data):
    @asyncio.coroutine
    def enqueue(value):
        print('enqueue()', value)
        yield from q.put(value)
    def callback(value):
        print('callback()', value)
        asyncio.async(enqueue(value))
    parse(data, callback)

    print('produce()')
    print('produce(): enqueuing sentinel value')
    asyncio.async(enqueue(None))
    print('produce(): done')

@asyncio.coroutine
def consume():
    print('consume()')
    while True:
        print('consume(): waiting')
        value = yield from q.get()
        print('consumed:', value)
        if value is not None:
            # here we'd like to yield and use this in a for loop elsewhere
            print(value)
        else:
            break
    print('consume(): done')

tasks = [
    asyncio.Task(consume()),
    asyncio.Task(produce(range(5)))
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

# I'd like:
# for value in iter_parse(data=range(5)):
#   print('consumed:', value)

It this kind of computation even possible with asyncio or do I need to use greenlet or gevent? I seems in gevent it is possible to iterate over async results in for loop but I don't like to depend on another library if possible and it is not completely ready for Python 3.

Community
  • 1
  • 1
Bohumir Zamecnik
  • 2,450
  • 28
  • 23
  • Is there a particular reason you want to avoid threads here? It doesn't make sense to use `asyncio` or even `gevent` unless your xml parsing library is integrated with those frameworks. Otherwise, `parser.parse` is always going to block until its done, which will block the `asyncio`/`gevent` event loop. Since XML parsing isn't I/O-bound, I doubt your XML library is integrated with either framework, so your only option is to use a thread. – dano Apr 19 '15 at 01:07
  • if this parsing is done within an async server then you can yield earlier than if undertaking a computationally heavy load regardless of I/O bound. for example, in boost asio a stackless coroutine is used in some cases to parse a simple string. – Gabe Rainbow Apr 19 '15 at 02:42
  • Please check [PEP 525](https://www.python.org/dev/peps/pep-0525/) of Python 3.6 to see if that can be helpful. – Fantix King Mar 26 '18 at 02:01

0 Answers0