15

My question is more or less like this, which is really an X-Y problem leading back to this. This is, however, not a duplicate, because my use case is slightly different and the linked threads don't answer my question.

I am porting a set of synchronous programs from Java to Python. These programs interact with an asynchronous library. In Java, I could block and wait for this library's asynchronous functions to return a value and then do things with that value.

Here's a code sample to illustrate the problem.

def do_work_sync_1(arg1, arg2, arg3):
    # won't even run because await has to be called from an async function
    value = await do_work_async(arg1, arg2, arg3)

def do_work_sync_2(arg1, arg2, arg3):
    # throws "Loop already running" error because the async library referenced in do_work_async is already using my event loop
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(do_work_async(arg1, arg2, arg3))

def do_work_sync_3(arg1, arg2, arg3):
    # throws "got Future attached to a different loop" because the do_work_async refers back to the asynchronous library, which is stubbornly attached to my main loop
    thread_pool = ThreadPoolExecutor()
    future = thread_pool.submit(asyncio.run, do_work_async(arg1, arg2, arg3)
    result = future.result()

def do_work_sync_4(arg1, arg2, arg3):
    # just hangs forever
    event_loop = asyncio.get_event_loop()
    future = asyncio.run_coroutine_threadsafe(do_work_async(arg1, arg2, arg3), event_loop)
    return_value = future.result()

async def do_work_async(arg1, arg2, arg3):
    value_1 = await async_lib.do_something(arg1)
    value_2 = await async_lib.do_something_else(arg2, arg3)

    return value_1 + value_2

Python appears to be trying very hard to keep me from blocking anything, anywhere. await can only be used from async def functions, which in their turn must be awaited. There doesn't seem to be a built-in way to keep async def/await from spreading through my code like a virus.

Tasks and Futures don't have any built-in blocking or wait_until_complete mechanisms unless I want to loop on Task.done(), which seems really bad.

I tried asyncio.get_event_loop().run_until_complete(), but that produces an error: This event loop is already running. Apparently I'm not supposed to do that for anything except main().

The second linked question above suggests using a separate thread and wrapping the async function in that. I tested this with a few simple functions and it seems to work as a general concept. The problem here is that my asynchronous library keeps a reference to the main thread's event loop and throws an error when I try to refer to it from the new thread: got Future <Future pending> attached to a different loop.

I considered moving all references to the asynchronous library into a separate thread, but I realized that I still can't block in the new thread, and I'd have to create a third thread for blocking calls, which would bring me back to the Future attached to a different loop error.

I'm pretty much out of ideas here. Is there a way to block and wait for an async function to return, or am I really being forced to convert my entire program to async/await? (If it's the latter, an explanation would be nice. I don't get it.)

cf-
  • 8,598
  • 9
  • 36
  • 58
  • 2
    how many concurrent blocking calls are you expecting? if it's less than like 10000 or so, and you're not as concerned about memory, you can just run blocking calls in thread pools. without seeing a clear example of where you're really running into trouble, it's tough to say what you should do. – acushner Jul 27 '19 at 18:13
  • 1
    As an aside, "There doesn't seem to be a built-in way to keep async def/await from spreading through my code like a virus" is my exact concern with Python's implementation of async, and the reason I haven't adopted it in my own code bases. – Adam Smith Jul 27 '19 at 18:17
  • @acushner Not many. I'm starting with a single one as a testbed. I did try to use a `concurrent.futures.ThreadPoolExecutor()` to spawn a new thread, but since my async library keeps a reference to the main thread's event loop, anything I do within the new thread's new event loop bombs out. – cf- Jul 27 '19 at 18:29
  • That's the `got Future attached to a different loop` bit above, if I wasn't clear. Not sure how to make it less ambiguous without posting code, and the code is a mess after 8 hours of trying every trick I could find to get this working. The basic idea I tried is to run `thread_pool.submit(wrapper_function, original_function, *args, **kwargs)` where `wrapper_function` calls `asyncio.new_event_loop` (`get_event_loop` fails for some reason) and `event_loop.run_until_complete(original_function(*args, **kwargs))`. – cf- Jul 27 '19 at 18:33
  • yeah, combining the 2 is not easy, in the least. the concept is you either want single-threaded cooperative multi-tasking (async/await) OR you want preemptive multi-tasking (threads/processes). occasionally you want to run an event loop on another thread, which is doable using calls like `run_coroutine_threadsafe`, but it becomes a lot to deal with. – acushner Jul 27 '19 at 19:04
  • `do_work_sync_4` looks like the correct approach. It presumably hangs because the event loop is simply not running? In that case, can't you just *keep it running*, in a dedicated thread? After importing the library, but before doing much of anything else, start a thread, and have it execute `asyncio.set_event_loop(that_loop); that_loop.run_forever()`. That looks like it would solve your problems nicely. – user4815162342 Aug 03 '19 at 12:52

1 Answers1

12

It took me some time, but finally I've found the actual question

Is there a way to block and wait for an async function to return, or am I really being forced to convert my entire program to async/await?

There is a high-level function asyncio.run(). It does three things:

  1. create new event loop
  2. run your async function in that event loop
  3. wait for any unfinished tasks and close the loop

Its source code is here: https://github.com/python/cpython/blob/3221a63c69268a9362802371a616f49d522a5c4f/Lib/asyncio/runners.py#L8 You see it uses loop.run_until_complete(main) under the hood.

If you are writing completely asynchronous code, you are supposed to call asyncio.run() somewhere at the end of your main() function, I guess. But that doesn't have to be the case. You can run it wherever you want, as many times you want. Caveats:

  • in given thread, at one time, there can be only one running event loop

  • do not run it from async def function, because, obviously, you have already one event loop running, so you can just call that function using await instead

Example:

import asyncio

async def something_async():
    print('something_async start')
    await asyncio.sleep(1)
    print('something_async done')

for i in range(3):
    asyncio.run(something_async())

You can have multiple threads with their own event loop:

import asyncio
import threading

async def something_async():
    print('something_async start in thread:', threading.current_thread())
    await asyncio.sleep(1)
    print('something_async done in thread:', threading.current_thread())

def main():
    t1 = threading.Thread(target=asyncio.run, args=(something_async(), ))
    t2 = threading.Thread(target=asyncio.run, args=(something_async(), ))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

if __name__ == '__main__':
    main()

If you encounter this error: Future attached to a different loop That may mean two tings:

  1. you are using resources tied to another event loop than you are running right now

  2. you have created some resource before starting an event loop - it uses a "default event loop" in that case - but when you run asyncio.run(), you start a different loop. I've encountered this before: asyncio.Semaphore RuntimeError: Task got Future attached to a different loop

You need to use Python version at least 3.5.3 - explanation here.

Messa
  • 24,321
  • 6
  • 68
  • 92
  • The big problem I'm always going to face is the presence of an existing event loop. My asynchronous lib latches onto the main event loop and keeps a reference to it, and it will throw `Future attached to a different loop` errors if I try to reference it from another Thread. I could move all references to the async lib into a second thread, but then I'd need a *third* thread to call `asyncio.run` from for my coroutines, and I will *still* get `Future attached to a different loop` errors. – cf- Jul 28 '19 at 02:36
  • Somewhere in the question details is the general idea that this async library (which I can't control) has monopolized my event loop and shut down the idea of multithreading because of the `Future attached to a different loop` errors. I am gradually becoming convinced that my only options are `asyncio.create_task` and looping until it's done or accepting my fate and letting `async def`/`await` infect my entire codebase. – cf- Jul 28 '19 at 02:38
  • OK, maybe it would be better to have only one loop for the whole time and reuse it. Like I wrote, `asyncio.run()` creates a new loop every time, so instead you can use `loop,run_until_complete()`. Example: https://gist.github.com/messa/36988d7311f91e71f8b65f521d6b053d I see above you have already tried this? If the error is "loop already running"t hen you should simply `await` instead of starting the loop. – Messa Jul 29 '19 at 09:32
  • Maybe I am starting to understand :) You probably have sync ("blocking") and async code and want it run side by side somehow. Can you run the blocking code in separate thread, and when it needs to run async code, schedule it "in the async thread" using `loop.call_soon_threadsafe()` https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_soon_threadsafe()? – Messa Jul 29 '19 at 09:38
  • @computerfreaker Do you have some code sample please? :) – Messa Jul 29 '19 at 09:40
  • That's *exactly* it. My program is designed to be synchronous. The library I'm using is asynchronous. The async lib constructs or takes over my main thread's event loop, and then throws `Loop already running` errors if I try to `event_loop`.`run_until_complete` or `Future attached to a different loop` errors if I try to schedule something using a different thread. I'll try to get a MCVE up shortly. – cf- Jul 29 '19 at 23:28