3

I have two Python threads that share some state, A and B. At one point, A submits a callback to be run by B on its loop with something like:

# This line is executed by A
loop.call_soon_threadsafe(callback)

After this I want to continue doing something else, but I want to make sure that callback has been run by B before doing so. Is there any way (besides standard threading synchronization primitives) to make A wait for the completion of the callback? I know call_soon_threadsafe returns a asyncio.Handle object that can cancel the task, but I am not sure whether this can be used for waiting (I still don't know much about asyncio).

In this case, this callback calls loop.close() and cancels the remaining tasks, and after that, in B, after loop.run_forever() there is a loop.close(). So for this use case in particular a thread-safe mechanism that allows me to know from A when the loop has been effectively closed would also work for me - again, not involving a mutex/condition variable/etc.

I know that asyncio is not meant to be thread-safe, with very few exceptions, but I wanted to know if a convenient way to achieve this is provided.


Here is a very small snippet of what I mean in case it helps.

import asyncio
import threading
import time

def thread_A():
    print('Thread A')
    loop = asyncio.new_event_loop()
    threading.Thread(target=thread_B, args=(loop,)).start()
    time.sleep(1)
    handle = loop.call_soon_threadsafe(callback, loop)
    # How do I wait for the callback to complete before continuing?
    print('Thread A out')

def thread_B(loop):
    print('Thread B')
    asyncio.set_event_loop(loop)
    loop.run_forever()
    loop.close()
    print('Thread B out')

def callback(loop):
    print('Stopping loop')
    loop.stop()

thread_A()

I have tried this variation with asyncio.run_coroutine_threadsafe but it does not work, instead thread A hangs forever. Not sure if I am doing something wrong or it is because I am stopping the loop.

import asyncio
import threading
import time

def thread_A():
    global future
    print('Thread A')
    loop = asyncio.new_event_loop()
    threading.Thread(target=thread_B, args=(loop,)).start()
    time.sleep(1)
    future = asyncio.run_coroutine_threadsafe(callback(loop), loop)
    future.result()  # Hangs here
    print('Thread A out')

def thread_B(loop):
    print('Thread B')
    asyncio.set_event_loop(loop)
    loop.run_forever()
    loop.close()
    print('Thread B out')

async def callback(loop):
    print('Stopping loop')
    loop.stop()

thread_A()
jdehesa
  • 58,456
  • 7
  • 77
  • 121
  • Feels like a duplicate. Possibly related: https://stackoverflow.com/q/6800984/1531971, https://stackoverflow.com/q/35550501/1531971, https://stackoverflow.com/q/7836143/1531971 –  Nov 01 '18 at 18:16
  • @jdv: none of those are using asyncio. – Martijn Pieters Nov 01 '18 at 18:17
  • If you are using threads, are you actually also using `asyncio` to run tasks? Or are you only using `loop.call_soon_threadsafe(callback)` to handle some kind of thread-related task? `loop.call_soon_threadsafe(callback)` is really only meant for other threads to *signal to `asyncio`-loop-managed code*. – Martijn Pieters Nov 01 '18 at 18:20
  • Ok, how about https://stackoverflow.com/q/29475007/1531971, https://stackoverflow.com/q/44345139/1531971 –  Nov 01 '18 at 18:23
  • @jdv Thank you, but those are not for multithreaded code. – jdehesa Nov 01 '18 at 18:24

2 Answers2

4

Callbacks are set and (mostly) forget. They are not intended to be used for something you need to get a result back from. This is why the handle produced only lets you cancel a callback (this callback is no longer needed), nothing more.

If you need to wait for a result from an asyncio-managed coroutine in another thread, use a coroutine and schedule it as a task with asyncio.run_coroutine_threadsafe(); this gives you a Future() instance, which you can then wait for to be done.

However, stopping the loop with run_coroutine_threadsafe() does require the loop to handle one more round of callbacks than it'll actually be able to run; the Future returned by run_coroutine_threadsafe() would otherwise not be notified of the state change of the task it scheduled. You can remedy this by running asyncio.sleep(0) through loop.run_until_complete() in thread B before closing the loop:

def thread_A():
    # ... 
    # when done, schedule the asyncio loop to exit
    future = asyncio.run_coroutine_threadsafe(shutdown_loop(loop), loop)
    future.result()  # wait for the shutdown to complete
    print("Thread A out")

def thread_B(loop):
    print("Thread B")
    asyncio.set_event_loop(loop)
    loop.run_forever()
    # run one last noop task in the loop to clear remaining callbacks
    loop.run_until_complete(asyncio.sleep(0))
    loop.close()
    print("Thread B out")

async def shutdown_loop(loop):
    print("Stopping loop")
    loop.stop()

This is, of course, slightly hacky and depends on the internals of callback management and cross-threading task scheduling to not change. As the default asyncio implementation stands, running a single noop task is plenty for several rounds of callbacks creating more callbacks being handled, but alternative loop implementations may handle this differently.

So for shutting down the loop, you may be better off using thread-based coordination:

def thread_A():
    # ...
    callback_event = threading.Event()
    loop.call_soon_threadsafe(callback, loop, callback_event)
    callback_event.wait()  # wait for the shutdown to complete
    print("Thread A out")

def thread_B(loop):
    print("Thread B")
    asyncio.set_event_loop(loop)
    loop.run_forever()
    loop.close()
    print("Thread B out")

def callback(loop, callback_event):
    print("Stopping loop")
    loop.stop()
    callback_event.set()
Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
  • I was just now reading about `run_coroutine_threadsafe()`. So this would give me a future that I can safely wait on, even from another thread, and even if that task actually stops the loop? – jdehesa Nov 01 '18 at 18:26
  • I have tried with `while fut.running()`, but that seems to make thread `A` finish instantly. Even if I add a `time.sleep(1)` (for testing only! I know you're supposed to use `asyncio.sleep`) in `callback`, `future.running()` is always false... – jdehesa Nov 01 '18 at 18:36
  • @jdehesa: the issue I see is that stopping the loop also almost guarantees that the callbacks that pass the signals from the task managing the `shutdown_loop()` coroutine back to the `Future()` that thread B holds are also cancelled. The connection is severed here. Shutting down the loop needs a bit more.. handling. – Martijn Pieters Nov 01 '18 at 19:01
  • 1
    @jdehesa: sorry about that, I've worked out what callbacks where being wiped out, and all it takes is a single run through the loop. Scheduling `asyncio.sleep(0)`, a virtual noop coroutine, with `asyncio.run_until_complete()`, is enough to have those callbacks cleared out. Run that on Thread B before closing the loop. – Martijn Pieters Nov 01 '18 at 19:15
  • Yep that works! I may end up using thread-based coordination, like you say, for clarity and reliability, but kudos for working out a way to do it without it. – jdehesa Nov 02 '18 at 10:15
1

Is there any way (besides standard threading synchronization primitives) to make A wait for the completion of the callback?

Normally you'd use run_coroutine_threadsafe, as Martijn initially suggested. But your use of loop.stop() makes the callback somewhat specific. Given that, you are probably best off using the standard thread synchronization primitives, which are in this case very straightforward and can be completely decoupled from the callback implementation and the rest of your code. For example:

def submit_and_wait(loop, fn, *args):
    "Submit fn(*args) to loop, and wait until the callback executes."
    done = threading.Event()
    def wrap_fn():
        try:
            fn(*args)
        finally:
            done.set()
    loop.call_soon_threadsafe(wrap_fn)
    done.wait()

Instead of using loop.call_soon_threadsafe(callback), use submit_and_wait(loop, callback). The threading synchronization is there, but completely hidden inside submit_and_wait.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • Yes, this works nicely and is really straightforward. I accepted the other answer because it includes a way to solve it without explicit thread synchronization, as proposed in the question, but may end up using something like this too. – jdehesa Nov 02 '18 at 10:17