As a followup to my previous question about calling an async function from a synchronous one, I've discovered asyncio.run_coroutine_threadsafe
.
On paper, this looks ideal. Based on the comments in this StackOverflow question, this looks ideal. I can create a new Thread, get a reference to my original event loop, and schedule the async function to run inside the original event loop while only blocking the new Thread.
class _AsyncBridge:
def call_async_method(self, function, *args, **kwargs):
print(f"call_async_method {threading.get_ident()}")
event_loop = asyncio.get_event_loop()
thread_pool = ThreadPoolExecutor()
return thread_pool.submit(asyncio.run, self._async_wrapper(event_loop, function, *args, **kwargs)).result()
async def _async_wrapper(self, event_loop, function, *args, **kwargs):
print(f"async_wrapper {threading.get_ident()}")
future = asyncio.run_coroutine_threadsafe(function(*args, **kwargs), event_loop)
return future.result()
This doesn't error, but it doesn't ever return, either. The Futures just hang and the async call is never hit. It doesn't seem to matter whether I use a Future in call_async_method
, _async_wrapper
, or both; wherever I use a Future, it hangs.
I experimented with putting the run_coroutine_threadsafe
call directly in my main event loop:
event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()
Here too, the Future hangs.
I tried using the LoopExecutor
class defined here, which seems like the exact answer to my needs.
event_loop = asyncio.get_event_loop()
loop_executor = LoopExecutor(event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
There too, the returned Future hangs.
I toyed with the idea that I was blocking my original event loop and therefore the scheduled task would never run, so I made a new event loop:
event_loop = asyncio.get_event_loop()
new_event_loop = asyncio.new_event_loop()
print(event_loop == new_event_loop) # sanity check to make sure the new loop is actually different from the existing one - prints False as expected
loop_executor = LoopExecutor(new_event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
return return_value
Still hanging at future.result()
and I don't understand why.
What's wrong with asyncio.run_coroutine_threadsafe
/the way I'm using it?