2

I'm building a library that leverages asyncio internally. While the user shouldn't be aware of it, the internal implementation currently wraps the async code with the asyncio.run() porcelain wrapper.

However, some users will be executing this library code from a jupyter notebook, and I'm struggling to replace the asyncio.run() with a wrapper that's safe for either environment.

Here's what I've tried:

ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'


def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False)
    try:
        loop = asyncio.get_running_loop()
        task = loop.create_task(async_coroutine)
        result = loop.run_until_complete(task) # <- fails as loop is already running
        # OR
        asyncio.wait_for(task, timeout=None, loop=loop) # <- fails as this is an async method
        result = task.result()
    except RuntimeError as e:
        if _test_mode:
            raise e
        if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
            return asyncio.run(async_coroutine)
    except Exception as e:
        raise e

Requirements

  1. We use python 3.8, so we can't use asyncio.Runner context manager
  2. We can't use threading, so the solution suggested here would not work

Problem:

How can I wait/await for the async_coroutine, or the task/future provided by loop.create_task(async_coroutine) to be completed?

None of the methods above actually do the waiting, and for the reasons stated in the comments.


Update

I've found this nest_asyncio library that's built to solve this problem exactly:


ASYNC_IO_NO_RUNNING_LOOP_MSG = 'no running event loop'

HAS_BEEN_RUN = False


def jupyter_safe_run_coroutine(async_coroutine, _test_mode: bool = False):
    global HAS_BEEN_RUN
    if not HAS_BEEN_RUN:
        _apply_nested_asyncio_patch()
        HAS_BEEN_RUN = True
    return asyncio.run(async_coroutine)


def _apply_nested_asyncio_patch():
    try:
        loop = asyncio.get_running_loop()
        logger.info(f'as get_running_loop() returned {loop}, this environment has it`s own event loop.\n'
                    f'Patching with nest_asyncio')
        import nest_asyncio
        nest_asyncio.apply()
    except RuntimeError as e:
        if ASYNC_IO_NO_RUNNING_LOOP_MSG in str(e):
            logger.info(f'as get_running_loop() raised {e}, this environment does not have it`s own event loop.\n'
                        f'No patching necessary')
        else:
            raise e

Still, there are some issues I'm facing with it:

  1. As per this SO answer, there might be starvation issues
  2. Any logs written in the async_coroutine are not printed in the jupyter notebook
  3. The jupyter notebook kernel occasionally crashes upon completion of the task

Edit

For context, the library internally calls external APIs for data enrichment of a user-provided dataframe:

# user code using the library
import my_lib

df = pd.DataFrame(data='some data')
enriched_df = my_lib.enrich(df)
Joey Baruch
  • 4,180
  • 6
  • 34
  • 48
  • I don't understand why you would need to start the event loop yourself? Packages will typically provide asynchronous functions that will be consumed by a user who already has a running event loop. You (most likely) don't need to worry about how the user will start its loop. Nonetheless, I've come across one guy that made such a prototype for fun here https://stackoverflow.com/a/61331974/1720199 – cglacet Dec 08 '22 at 21:44
  • I don't need to start the event loop, I need to register the provided `async_coroutine` as a task with the existing running loop and then wait for its completion. – Joey Baruch Dec 09 '22 at 14:19

2 Answers2

1

It's usually a good idea to expose the asynchronous function. This way you will give your users more flexibility.

If some of your users can't (or don't want to) use asynchronous calls to your functions, they will be able to call the async function using asyncio.run(your_function()). Or in the rare situation where they have an event loop running but can't make async calls they could use the create_task + add_one_callback method described here. (I really have no idea why such a use case may happen, but for the sake of the argument I included it.)

Hidding the asynchronous interface from your users is not the best idea because it limits their capabilities. They will probably fork your package to patch it and make the exposed function async or call the hidden async function directly. None of which is good news for you (harder to document / track bugs). I would really suggest to stick to the simplest solution and provide the async functions as the main entry points.

Suppose the following package code followed by 3 different usage of it:

async def package_code():
    return "package"

Client codes

Typical clients will probably just use it this way:

async def client_code_a():
    print(await package_code())

# asyncio.run(client_code_a())

For some people, the following might make sense. For example if your package is the only asynchronous thing they will ever use. Or maybe they are not yet confortable using async code (these you can probably convince to try client_code_a instead):

def client_code_b():
    print(asyncio.run(package_code()))

# client_code_b()

The very few (I'm tempted to say none):

async def client_code_c():
    # asyncio.run() cannot be called from a running event loop:
    # print(asyncio.run(package_code()))
    loop = asyncio.get_running_loop()
    task = loop.create_task(package_code())
    task.add_done_callback(lambda t: print(t.result()))

# asyncio.run(client_code_c())
cglacet
  • 8,873
  • 4
  • 45
  • 60
0

I'm still not sure to understand what your goal is, but I'll describe with code what I tried to explain in my comment so you can tell me where your issue lies in the following.

If you package requests the user to call some functions (your_package_function in the example) that take coroutines as arguments, then you shouldn't worry about the event loop.

That means the package shouldn't call asyncio.run nor loop.run_until_complete. The client should (in almost all cases) be responsible for starting the even loop.

Your package code should assume there is an event loop running. Since I don't know your package's goal I just made a function that feeds a "test" argument to any coroutine the client is passing:

import asyncio

async def your_package_function(coroutine):
    print("- Package internals start")
    task = asyncio.create_task(coroutine("test"))
    await asyncio.sleep(.5) # Simulates slow tasks within your package
    print("- Package internals completed other task")
    x = await task
    print("- Package internals end")
    return x

The client (package user) should then call the following:

async def main():
    x = await your_package_function(return_with_delay)
    print(f"Computed value = {x}")

async def return_with_delay(value):
    print("+ User function start")
    await asyncio.sleep(.2)
    print("+ User function end")
    return value

await main()
# or asyncio.run(main()) if needed

This would print:

- Package internals start
- Package internals completed other task
+ User function start
+ User function end
- Package internals end
Computed value = test
cglacet
  • 8,873
  • 4
  • 45
  • 60
  • The client isn't passing any async coroutines; that's an internal implementation detail. Specifically, it makes API calls to get some data enrichment from some vendor. – Joey Baruch Dec 09 '22 at 16:10
  • Ah ok, I see, the client gives a function (not a coroutine) and your packages makes calls depending on some results of the input function? When you say "internal implementation detail" does that mean you expose a synchronous interface to your client? If that's the case then you probably want to expose asynchronous functions instead. The client, if he wishes to ignore that functionality you are offering, will then be able to decide how to call your asynchronous function. If you really want to limit your functionalities then you can do this : https://stackoverflow.com/a/61331974/1720199 – cglacet Dec 09 '22 at 16:42
  • But I really think you should expose the better version of your API (the async one). And if you are worried that some clients will refuse to use async code, then you can document how to call your functions when there is no event loop running (like the `add_done_callback` strategy proposed in the previous link). – cglacet Dec 09 '22 at 16:44
  • Added context to how the library is used to the question – Joey Baruch Dec 09 '22 at 16:50
  • 1
    Ok, then why not expose the async function? So the user can call `await my_lib.enrich(df)`. It makes more sense than giving no opportunity to make an async call. Your function makes async call, don't try to hide it. People using it will soon notice the function is very slow, then understand it makes async calls. At that point they will probably fork your package to patch it and make the function async. If it is async, some (probably rare) users will have a use case where they need to call it synchronously, but for them it will only require to use the techniques linked above (no patch needed). – cglacet Dec 10 '22 at 06:46
  • Thank you @cglacet, this is sensible advise – Joey Baruch Dec 11 '22 at 15:45
  • I just added another answer that better fit your actual question (basically my earlier comment in a nicer form). I will soon remove that answer. – cglacet Dec 12 '22 at 15:13
  • I think that is an acceptable answer, why would you want to remove it? – Joey Baruch Dec 12 '22 at 16:36
  • I can leave it as is. But it doesn't really answer your question. I'll leave the decision to reviewers then. – cglacet Dec 12 '22 at 17:51
  • That's a fair point, but sometimes the answer should be you shouldn't do it, here's why, and here's what you can do insted. – Joey Baruch Dec 12 '22 at 18:08