2

I have a program with one main thread where I spawn a second thread that uses asyncio. Are there any tools provided to synchronize these two threads? If everything was asyncio, I could do it with its synchronization primitives, eg:

import asyncio

async def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    await evt.wait()
    print('Retrieved:', lst.pop())

lst = []
evt = asyncio.Event()
asyncio.get_event_loop().run_until_complete(asyncio.gather(
    taskA(lst, evt),
    taskB(lst, evt),
))

However, this does not work with multiple threads. If I just use a threading.Event then it will block the asyncio thread. I figured out I could defer the wait to an executor:

import asyncio
import threading

def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    asyncio.get_event_loop().run_in_executor(None, evt.wait)
    print('Retrieved:', lst.pop())

def targetA(lst, evt):
    taskA(lst, evt)

def targetB(lst, evt):
    asyncio.set_event_loop(asyncio.new_event_loop())
    asyncio.get_event_loop().run_until_complete(taskB(lst, evt))

lst = []
evt = threading.Event()
threadA = threading.Thread(target=targetA, args=(lst, evt))
threadB = threading.Thread(target=targetB, args=(lst, evt))
threadA.start()
threadB.start()
threadA.join()
threadB.join()

However, having an executor thread only to wait for a mutex seems unnatural. Is this the way this is supposed to be done? Or is there any other way to wait for synchronization between OS threads asynchronously?

jdehesa
  • 58,456
  • 7
  • 77
  • 121
  • 1
    Await an `asyncio.Event` in taskB, and set it from taskA using `loop.call_soon_threadsafe`. – user4815162342 Nov 05 '18 at 16:29
  • @user4815162342 That's a reasonable option. It feels a bit like I'm kind of inverting the logic, and ideally I'd like the main thread not to have to deal with the asyncio loop of the other thread directly, but yes this may work for my case I think. – jdehesa Nov 05 '18 at 16:33
  • @user4815162342 I solved my problem using your suggestion. Since I wanted to have "function semantics" (process this data in the main thread and return some result), I used a `queue.Queue` to send futures from the asyncio thread to the main thread and `call_soon_threadsafe` to set their results from the main thread. Feel free to post it as an answer to accept. – jdehesa Nov 05 '18 at 17:22
  • Using futures for this purpose is definitely the way to go - ideally taskB would create two futures, one `concurrent.futures` and one `asyncio`, then connect them, send the concurrent one to taskA, and await the asyncio one. `run_in_executor` implements fairly generic future chaining that could be reused for this, but I'm not sure if any of that is public. – user4815162342 Nov 05 '18 at 19:33
  • I've now posted the answer with the original comment, but also an alternative approach which should remove the need for an explicit queue. – user4815162342 Nov 06 '18 at 06:48
  • Might be a duplicate of https://stackoverflow.com/q/28492103. – Lonami May 31 '20 at 06:52

1 Answers1

2

A simple way to synchronize an asyncio coroutine with an event coming from another thread is to await an asyncio.Event in taskB, and set it from taskA using loop.call_soon_threadsafe.

To be able to pass values and exceptions between the two, you can use futures; however then you are inventing much of run_in_executor. If the only job of taskA is to take tasks off a queue, you might as well make a single-worker "pool" and use it as your worker thread. Then you can use run_in_executor as intended:

worker = concurrent.futures.ThreadPoolExecutor(max_workers=1)

async def taskB(lst):
    loop = asyncio.get_event_loop()
    # or result = await ..., if taskA has a useful return value
    # This will also propagate exceptions raised by taskA
    await loop.run_in_executor(worker, taskA, lst)
    print('Retrieved:', lst.pop())

The semantics are the same as in your version with an explicit queue - the queue is still there, it's just inside the ThreadPoolExecutor.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • Thanks, this is a good idea too and it's surely cleaner. In my case however I would probably have to write my own executor... The context for this is a Python script for Unreal Engine, which has a main thread that "ticks" objects on every frame, and I want to have an asyncio thread that sometimes sends stuff to be executed on the next UE "tick" (because some stuff needs to be done on the main thread). Anyway I got to solve it with your suggestions and the idea of this solution is good too. – jdehesa Nov 06 '18 at 10:21
  • @jdehesa Fair enough. Thanks for the interesting question! – user4815162342 Nov 06 '18 at 14:57