I'm building a library that leverages asyncio internally. While the user shouldn't be aware of it, the internal implementation currently wraps the async code with the asyncio.run() porcelain wrapper.
However, some users will be executing this library code from a jupyter notebook, and I'm struggling to replace the asyncio.run()
with a wrapper that's safe for either environment.
Here's what I've tried:
ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'
def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False)
try:
loop = asyncio.get_running_loop()
task = loop.create_task(async_coroutine)
result = loop.run_until_complete(task) # <- fails as loop is already running
# OR
asyncio.wait_for(task, timeout=None, loop=loop) # <- fails as this is an async method
result = task.result()
except RuntimeError as e:
if _test_mode:
raise e
if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
return asyncio.run(async_coroutine)
except Exception as e:
raise e
Requirements
- We use python 3.8, so we can't use asyncio.Runner context manager
- We can't use threading, so the solution suggested here would not work
Problem:
How can I wait/await for the async_coroutine
, or the task/future provided by loop.create_task(async_coroutine)
to be completed?
None of the methods above actually do the waiting, and for the reasons stated in the comments.
Update
I've found this nest_asyncio library that's built to solve this problem exactly:
ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'
HAS_BEEN_RUN = False
def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False):
global HAS_BEEN_RUN
if not HAS_BEEN_RUN:
_apply_nested_asyncio_patch()
HAS_BEEN_RUN = True
return asyncio.run(async_coroutine)
def _apply_nested_asyncio_patch():
try:
loop = asyncio.get_running_loop()
logger.info(f'as get_running_loop() returned {loop}, this environment has it`s own event loop.\n'
f'Patching with nest_asyncio')
import nest_asyncio
nest_asyncio.apply()
except RuntimeError as e:
if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
logger.info(f'as get_running_loop() raised {e}, this environment does not have it`s own event loop.\n'
f'No patching necessary')
else:
raise e
Still, there are some issues I'm facing with it:
- As per this SO answer, there might be starvation issues
- Any logs written in the async_coroutine are not printed in the jupyter notebook
- The jupyter notebook kernel occasionally crashes upon completion of the task
Edit
For context, the library internally calls external APIs for data enrichment of a user-provided dataframe:
# user code using the library
import my_lib
df = pd.DataFrame(data='some data')
enriched_df = my_lib.enrich(df)