3

I have the following scenario:

  1. I have a blocking, synchronous generator
  2. I have an non-blocking, async function

I would like to run blocking generator (executed in a ThreadPool) and the async function on the event loop. How do I achieve this?

The following function simply prints the output from the generator, not from sleep function.

Thanks!

from concurrent.futures import ThreadPoolExecutor

import numpy as np
import asyncio
import time


def f():
    while True:
        r = np.random.randint(0, 3)
        time.sleep(r)
        yield r


async def gen():
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor()
    gen = await loop.run_in_executor(executor, f)
    for item in gen:
        print(item)
        print('Inside generator')


async def sleep():
    while True:
        await asyncio.sleep(1)
        print('Inside async sleep')


async def combine():
    await asyncio.gather(sleep(), gen())


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(combine())


if __name__ == '__main__':
    main()
abhinavkulkarni
  • 2,284
  • 4
  • 36
  • 54

1 Answers1

7

run_in_executor doesn't work on generators because it is designed for blocking functions. While a generator is a valid function, it returns immediately when called, providing an object that the caller is supposed to exhaust through repeated invocations of next. (This is what Python's for loop does under the hood.) To use a blocking generator from async code, you have two choices:

  • wrap each step of the iteration (each individual call to next) in a separate call to run_in_executor, or
  • start a for loop in a separate thread and use a queue to transfer the objects to an async consumer.

Either approach can be abstracted into a function that accepts an iterator and returns an equivalent async iterator. This is an implementation of the second approach:

import asyncio, threading

def async_wrap_iter(it):
    """Wrap blocking iterator into an asynchronous one"""
    loop = asyncio.get_event_loop()
    q = asyncio.Queue(1)
    exception = None
    _END = object()

    async def yield_queue_items():
        while True:
            next_item = await q.get()
            if next_item is _END:
                break
            yield next_item
        if exception is not None:
            # the iterator has raised, propagate the exception
            raise exception

    def iter_to_queue():
        nonlocal exception
        try:
            for item in it:
                # This runs outside the event loop thread, so we
                # must use thread-safe API to talk to the queue.
                asyncio.run_coroutine_threadsafe(q.put(item), loop).result()
        except Exception as e:
            exception = e
        finally:
            asyncio.run_coroutine_threadsafe(q.put(_END), loop).result()

    threading.Thread(target=iter_to_queue).start()
    return yield_queue_items()

It can be tested with a trivial sync iterator that uses time.time() to block and an async heartbeat function to prove that the event loop is running:

# async_wrap_iter definition as above

import time

def test_iter():
    for i in range(5):
        yield i
        time.sleep(1)

async def test():
    ait = async_wrap_iter(test_iter())
    async for i in ait:
        print(i)

async def heartbeat():
    while True:
        print('alive')
        await asyncio.sleep(.1)

async def main():
    asyncio.create_task(heartbeat())
    await test()

asyncio.run(main())
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • Would the first approach avoid loading all items in memory ? – yan-hic Jun 17 '20 at 13:08
  • 1
    @YannickEinsweiler The second approach assumed that items would be processed faster than they would be extracted from the blocking iterator. I've now amended the answer to fix that by using a queue with fixed capacity and replacing `call_soon_threadsafe` with `run_coroutine_threadsafe` which provides back-pressure. – user4815162342 Jun 17 '20 at 14:19
  • That did it ! Great. – yan-hic Jun 17 '20 at 21:03
  • I appreciate the solution. However one problem is that if the caller of `yield_queue_items` throws during the iteration, `iter_to_queue` will never end, since the put call will get stuck. You need a way for `yield_queue_items` to signal to `iter_to_queue` whether it should proceed. – Vinicius Fortuna Nov 22 '20 at 04:51
  • @ViniciusFortuna That's a general problem with iterators that refer to underlying resources, and would also happen on any abandoning of the iteration (`break` out of the loop), not just on exception. It's not trivial to fix, you can either use try/finally in `yield_queue_items()` to stop `iter_to_queue`, but then you're depending on GC's finalization. The other option is to return a context manager which must be entered for iteration to begin - then leaving the context manager can signal `iter_to_queue` to stop. If you post a question about that case, I can expand the code from this question. – user4815162342 Nov 22 '20 at 09:15
  • @user4815162342 I am interested in this case using a context manager - is it still OK to ask this question? – MindV0rtex Oct 31 '22 at 19:45
  • @MindV0rtex I'm a bit out of shape regarding asyncio, but it would still make sense to ask the question, someone else could pick it up. Also, do take a look at the excellent aiostream library, I believe it already has the necessary context managers. – user4815162342 Nov 01 '22 at 09:33