3

As a followup to my previous question about calling an async function from a synchronous one, I've discovered asyncio.run_coroutine_threadsafe.

On paper, this looks ideal. Based on the comments in this StackOverflow question, this looks ideal. I can create a new Thread, get a reference to my original event loop, and schedule the async function to run inside the original event loop while only blocking the new Thread.

class _AsyncBridge:
    def call_async_method(self, function, *args, **kwargs):
        print(f"call_async_method {threading.get_ident()}")
        event_loop = asyncio.get_event_loop()
        thread_pool = ThreadPoolExecutor()
        return thread_pool.submit(asyncio.run, self._async_wrapper(event_loop, function, *args, **kwargs)).result()

    async def _async_wrapper(self, event_loop, function, *args, **kwargs):
        print(f"async_wrapper {threading.get_ident()}")
        future = asyncio.run_coroutine_threadsafe(function(*args, **kwargs), event_loop)
        return future.result()

This doesn't error, but it doesn't ever return, either. The Futures just hang and the async call is never hit. It doesn't seem to matter whether I use a Future in call_async_method, _async_wrapper, or both; wherever I use a Future, it hangs.

I experimented with putting the run_coroutine_threadsafe call directly in my main event loop:

event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()

Here too, the Future hangs.

I tried using the LoopExecutor class defined here, which seems like the exact answer to my needs.

event_loop = asyncio.get_event_loop()
loop_executor = LoopExecutor(event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()

There too, the returned Future hangs.

I toyed with the idea that I was blocking my original event loop and therefore the scheduled task would never run, so I made a new event loop:

event_loop = asyncio.get_event_loop()
new_event_loop = asyncio.new_event_loop()
print(event_loop == new_event_loop) # sanity check to make sure the new loop is actually different from the existing one - prints False as expected
loop_executor = LoopExecutor(new_event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
return return_value

Still hanging at future.result() and I don't understand why.

What's wrong with asyncio.run_coroutine_threadsafe/the way I'm using it?

cf-
  • 8,598
  • 9
  • 36
  • 58

2 Answers2

5

I think there are two problems. First one is that run_coroutine_threadsafe only submit the coroutine but not really run it.

So

event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()

doesn't work as you've never run this loop.

To make it work, theoretically, you can just use asyncio.run(future), but actually, you cannot, maybe it is because that it is submitted by run_coroutine_threadsafe. The following will work:

import asyncio

async def stop():
    await asyncio.sleep(3)

event_loop = asyncio.get_event_loop()
coro = asyncio.sleep(1, result=3)
future = asyncio.run_coroutine_threadsafe(coro, event_loop)
event_loop.run_until_complete(stop())
print(future.result())

The second problem is, I think you have noticed that your structure is somehow reversed. You should run the event loop in the separated thread but submit the task from the main thread. If you submit it in the separated thread, you still need to run the event loop in the main thread to actually execute it. Mostly I would suggest just create another event loop in the separated thread.

Sraw
  • 18,892
  • 11
  • 54
  • 87
  • Maybe the answer should show an example of starting the event loop in a separate thread? The included snippet will work, but is unlikely to be what the OP wanted to achieve. – user4815162342 Jul 28 '19 at 08:03
  • 1
    Frustrating. The included snippet is actually exactly what I wanted to achieve, but `asyncio.run` and `event_loop.run_until_complete` are both off limits to me as my main event loop has been monopolized by the async library I'm trying to interact with. Trying to use them will get me `This loop is already running` errors. I also can't use a new event loop in the separate thread because the async library holds a reference to my main thread's event loop and will throw `Future attached to a different loop` errors. – cf- Jul 28 '19 at 17:21
  • Doesn't seem like there's truly a way to have the existing event loop run an async method and return a result unless I convert my entire codebase to `async`/`await`. – cf- Jul 28 '19 at 17:22
  • Both of your problems can be solved. Let me find a spare time to add details. But in short, for the first question, if you are still using the event loop in main thread, then all your snippet are just useless as they are exactly the same as a simple normal `submit`. For your second question, mostly an async lib allows you to pass a ` loop` parameter. Or you can create a new event loop and set it as default event loop in the target thread. – Sraw Jul 28 '19 at 17:40
  • 2
    So if I create a new thread with a new event loop, bind that new event loop to the async lib, and then call the async lib's methods *from my main thread's event loop*, it should work? Or am I misunderstanding? I still don't quite see where I can call `run_until_complete` since I think I need it to be the same loop as `run_coroutine_threadsafe` is getting, and that event loop is already in a permanent `run_until_complete` because of the async lib. – cf- Jul 28 '19 at 20:24
0

I have run into the same issue, and the answer by @Sraw did not help me, as in my case the coroutine also had to be done in the main event loop, monopolized by the async library.

As a quick (though admittedly hacky) approach, what helped me is the nest_asyncio library, which patches asyncio to allow nested event loops.

With it, the following works for me, even if the event loop is already running:

import asyncio
import nest_asyncio
nest_asyncio.apply()

event_loop = asyncio.get_event_loop()
coro = asyncio.sleep(1, result=3)
print(event_loop.run_until_complete(coro))

This also allows me to create a simple wrapper function that, for the purposes of the question, turns any async function into a synchronous one:

def run_cor(obj):
    if asyncio.iscoroutine(obj):
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(obj)
        return result
    else:
        return obj

run_cor(asyncio.sleep(1, result=3))
# Returns 3

This was useful to me because the async library I'm using (which is Telethon) is changing the same functions to be either sync or async depending on whether an event loop is running or not, allowing me to have the same code for both cases.

Architector 4
  • 98
  • 1
  • 8