2

In short, the problem is that the future returned by asyncio.run_coroutine_threadsafe is blocking when I call future.result()

The problem is also documented in the following question with (currently) no satisfactory answer: Future from asyncio.run_coroutine_threadsafe hangs forever?

What I'm trying to achieve is to call async code from sync code, where the sync code is actually itself wrapped in async code with an existing running event loop (to make things more concrete: it's a Jupyter notebook).

I would want to send async tasks from nested sync code to the existing 'outer' event loop and 'await' its results within the nested sync code. Implied constraint: I do not want to run those tasks on a new event loop (multiple reasons).

Since it's not possible to just 'await' an async result from sync code without blocking and without using asyncio.run which creates a new event loop, I thought using a separate thread would somehow help.

From the documentation description, asyncio.run_coroutine_threadsafe sounds like the perfect candidate.

But it's still blocking...

Bellow full snippet, with a timeout when calling the future's result.

How can I get this code to work correctly?

import asyncio
from concurrent.futures import ThreadPoolExecutor


async def gather_coroutines(*coroutines):
    return await asyncio.gather(*coroutines)


def run_th_safe(loop, coroutines):
    future = asyncio.run_coroutine_threadsafe(gather_coroutines(*coroutines), loop)
    res = future.result(timeout=3)      # **** BLOCKING *****
    return res


def async2sync(*coroutines):
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        return asyncio.run(gather_coroutines(*coroutines))

    # BLOW DOESN'T WORK BECAUSE run_th_safe IS BLOCKING
    with ThreadPoolExecutor(max_workers=1) as ex:
        thread_future = ex.submit(run_th_safe, loop, coroutines)
        return thread_future.result()


# Testing
async def some_async_task(n):
    """Some async function to test"""
    print('Task running with n =', n)
    await asyncio.sleep(n/10)
    print('Inside coro', n)
    return list(range(n))


async def main_async():
    coro3 = some_async_task(30)
    coro1 = some_async_task(10)
    coro2 = some_async_task(20)
    results = async2sync(coro3, coro1, coro2)
    return results


def main_sync():
    coro3 = some_async_task(30)
    coro1 = some_async_task(10)
    coro2 = some_async_task(20)
    results = async2sync(coro3, coro1, coro2)
    return results


if __name__ == '__main__':
    # Testing functionnality with asyncio.run()
    # This works
    print(main_sync())

    # Testing functionnality with outer-loop (asyncio.run) and nested asyncio.run_coroutine_threadsafe
    # **DOESN'T WORK**
    print(asyncio.run(main_async()))

Jean Monet
  • 2,075
  • 15
  • 25
  • Returning a blocking future is the whole point of `run_coroutine_threadsafe` because it's supposed to serve as a bridge between asyncio world (where everything is inside the event loop and blocking is not allowed) and the sync world (where there is no event loop and blocking is how you wait for things). `run_coroutine_threadsafe` therefore accepts a non-blocking awaitable and returns a blocking concurrent.futures Future. Of course, nothing compels you to call `result()` on the returned future, you could go on to do other things and use `done()` to occasionally check on it, and so on. – user4815162342 Jan 26 '21 at 23:02
  • But more importantly, I don't understand how `async2sync(coro3, coro1, coro2)` inside `async def main_async()` is supposed to work. You don't await it and you expect results to be returned, so it must obviously block. But how will the outer event loop process those inner coroutines, who will run it while the `main_async` coroutine blocks? And why even call `async2sync()` there instead of `results = await asyncio.gather(coro3, coro1, coro2)`? – user4815162342 Jan 26 '21 at 23:03
  • 2
    If async code calls into sync code, you just can't run additional async code from inside that sync code _and_ re-use the outer event loop, because the outer event loop is dead stuck on sync code. What you can do is either have a new event loop (which you don't want), or have a second event loop in a separate thread to service all "inner" tasks using `run_coroutine_threadsafe`, at the cost of having two event loops and having the original outer event loop getting stuck while calling sync code. I don't think the architecture of asyncio allows solving this within the constraints you've set. – user4815162342 Jan 26 '21 at 23:09
  • @user4815162342 thanks for explanations. `main_async` wrapped by `asyncio.run` was to emulate the Jupyter environment where there exists an outer event loop, which forbids calling `asyncio.run` in same thread. So `async2sync` is intended as a wrapper that can be called from sync and produce results even if there exists an outer event loop (allowing to easily patch together async & sync without rewriting everything). Due to a combo of libraries that I don't fully understand, creating a new event loop results in RuntimeError `got Future attached to a different loop` in Jupyter – Jean Monet Jan 27 '21 at 08:45
  • *main_async wrapped by asyncio.run was to emulate the Jupyter environment where there exists an outer event loop, which forbids calling asyncio.run in same thread* - but why would you want to call `asyncio.run` when you can just await (because you're async)? According to answers like [this](https://stackoverflow.com/a/50844238/1600898), the await functionality has been part of Jupyter for years. – user4815162342 Jan 27 '21 at 08:51
  • yes the problem is when I define a sync function I can no longer use the await keyword inside it. So the goal was to use the functionnality of an async function from within a sync function (patching these together without rewriting everything on the sync side) at the acceptable cost of running a new thread. But from your suggestions, I understand it is not really possible using just `asyncio` without creating a new event loop and basically using `asyncio.run` in a separate thread (but then the 'different loop' problem). I had some hopes with `greenletio` lib but doesn't seem to work in Jupyter – Jean Monet Jan 27 '21 at 09:00
  • Maybe you should just ask a separate question about the "different loop" problem, ideally isolating it to a minimal test case that doesn't depend on Jupyter. Perhaps that problem is actually solvable. – user4815162342 Jan 27 '21 at 09:13
  • Thanks again for your clarifications, I will further investigate the "different loop" issue – Jean Monet Jan 27 '21 at 09:30
  • I have a very similar issue, did you ever solve it @JeanMonet – fsociety Mar 02 '22 at 09:14

0 Answers0