I have two Python threads that share some state, A
and B
. At one point, A
submits a callback to be run by B
on its loop with something like:
# This line is executed by A
loop.call_soon_threadsafe(callback)
After this I want to continue doing something else, but I want to make sure that callback
has been run by B
before doing so. Is there any way (besides standard threading synchronization primitives) to make A
wait for the completion of the callback? I know call_soon_threadsafe
returns a asyncio.Handle
object that can cancel the task, but I am not sure whether this can be used for waiting (I still don't know much about asyncio
).
In this case, this callback calls loop.close()
and cancels the remaining tasks, and after that, in B
, after loop.run_forever()
there is a loop.close()
. So for this use case in particular a thread-safe mechanism that allows me to know from A
when the loop has been effectively closed would also work for me - again, not involving a mutex/condition variable/etc.
I know that asyncio
is not meant to be thread-safe, with very few exceptions, but I wanted to know if a convenient way to achieve this is provided.
Here is a very small snippet of what I mean in case it helps.
import asyncio
import threading
import time
def thread_A():
print('Thread A')
loop = asyncio.new_event_loop()
threading.Thread(target=thread_B, args=(loop,)).start()
time.sleep(1)
handle = loop.call_soon_threadsafe(callback, loop)
# How do I wait for the callback to complete before continuing?
print('Thread A out')
def thread_B(loop):
print('Thread B')
asyncio.set_event_loop(loop)
loop.run_forever()
loop.close()
print('Thread B out')
def callback(loop):
print('Stopping loop')
loop.stop()
thread_A()
I have tried this variation with asyncio.run_coroutine_threadsafe
but it does not work, instead thread A
hangs forever. Not sure if I am doing something wrong or it is because I am stopping the loop.
import asyncio
import threading
import time
def thread_A():
global future
print('Thread A')
loop = asyncio.new_event_loop()
threading.Thread(target=thread_B, args=(loop,)).start()
time.sleep(1)
future = asyncio.run_coroutine_threadsafe(callback(loop), loop)
future.result() # Hangs here
print('Thread A out')
def thread_B(loop):
print('Thread B')
asyncio.set_event_loop(loop)
loop.run_forever()
loop.close()
print('Thread B out')
async def callback(loop):
print('Stopping loop')
loop.stop()
thread_A()