3

In python asyncio it is straightforward if everything runs under the same event loop in one thread.

How can you pass an event from another thread that runs in normal multi-threading mode?

The closest I can find is:

  • In the async domain:

    thing = False # global
    c = asyncio.Condition() # global
    
    def check_thing():
        print("Checking...", thing)
        return thing
    
    async def hello_task():
        print("acquire")
        await c.acquire()
        await c.wait_for(check_thing)
        c.release()
        print("releqse")
        #...
    
    def hello_notify(): # called from another thread
        global thing
        thing = True
        print("notify")
        c.notify_all()
    
  • In another thread:

    hello_notify()
    

When hello_notify() is called from another normal thread, it throws an RuntimeError('cannot notify on un-acquired lock') exception.

How can this be solved without changing everything into asyncio model? I do see "acquire" printed before "notify", but "release" not printed, so the condition is "locked" I assume? Since the condition is acquired, does it mean "un-acquired" in the calling thread?

In general, how can you pass an event from another thread into an async task in python?

martineau
  • 119,623
  • 25
  • 170
  • 301
minghua
  • 5,981
  • 6
  • 45
  • 71
  • 2
    it sounds like you are looking for [`loop.call_soon_threadsafe`](https://docs.python.org/3/library/asyncio-dev.html#concurrency-and-multithreading) – gold_cy Nov 02 '20 at 19:21

2 Answers2

4

As pointed out in a comment, you need to use call_soon_threadsafe. For example:

thing = asyncio.Event()
_loop = asyncio.get_event_loop()

async def hello_task():
    print("waiting for thing")
    await thing.wait()
    thing.clear()
    #...

def hello_notify(): # called from another thread
    print("notify")
    # tell the event loop to call thing.set()
    _loop.call_soon_threadsafe(thing.set)

Note that asyncio doesn't require a mutex to protect the shared resource because asyncio objects can only be modified from the thread that runs the asyncio event loop. Because of that a full-blown asyncio.Condition is in fact rarely needed.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
2

I created the following code and I pass threading.Event to Thread with asyncio task to stop it's work, is it what you want to achieve?

import asyncio
import time
import threading


async def do_work(e: threading.Event):
    counter = 0
    while True:
        if e.is_set():
            break
        await asyncio.sleep(2)
        print(counter)
        counter += 1
    print("Async do_work was stopped")


def another_tread(e):
    asyncio.run(do_work(e))
    print("Another thread finished it's work")


if __name__ == '__main__':
    e = threading.Event()
    another_th = threading.Thread(target=another_tread, args=(e, ), daemon=False)
    another_th.start()
    time.sleep(10)
    print("Send stop Event")
    e.set()
    time.sleep(10)
    print("Main trhead finished")

Another way to solve the issue if we do not want to use threading.Event due to it's blocking nature:

import asyncio
import time
import threading


async def print_hello(_e: asyncio.Event):
    """Some function to prove that asyncio loop really run in another thread
    while we're waitng for event in main thread"""
    while True:
        print("Hello")
        await asyncio.sleep(0.5)
        if _e.is_set():
            break


async def alert(_e: asyncio.Event):
    """Waiting for event from another thread"""
    await event.wait()
    print("ALERT")


async def main_work(_e: asyncio.Event):
    """Wrapper to combine print_hello and alert"""
    # task are created to run coroutine simultaneously
    t1 = asyncio.create_task(print_hello(_e))
    t2 = asyncio.create_task(alert(_e))
    await t1
    await t2


async def stop_loop(_e: asyncio.Event):
    """The coroutine which raise event for main_work"""
    _e.set()
    print("Stop loop")


def main_async(_loop: asyncio.ProactorEventLoop, _e: asyncio.Event):
    """Another thread to run loop"""
    try:
        _loop.run_until_complete(main_work(_e))
    finally:
        print("Loop is closed")
        _loop.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()  # create loop
    event = asyncio.Event()  # create event
    # send loop and to another thread
    th = threading.Thread(target=main_async, args=(loop, event), name="ThreadAsync")
    th.start()
    time.sleep(5)
    # alert loop, which run in another thread
    asyncio.run_coroutine_threadsafe(stop_loop(event), loop)
    time.sleep(5)
    th.join()
    print("MAIN THREAD IS FINISHED")
Artiom Kozyrev
  • 3,526
  • 2
  • 13
  • 31
  • +1 thanks for the solution. Though, if you can change `if e.is_set()` into `await e`, and remove the `await asyncio.sleep(2)`, it would be a perfect valid solution for me. I don't like the `await asyncio.sleep(2)` because I know how to achieve the same without the event. I'm looking a way to pass an event without incur extra delay. – minghua Nov 17 '20 at 04:04
  • 1
    @minghua unfortunately you can not wait for ```threading.Event``` in ```asyncio``` loop since it will block the whole loop. I developed another solution if you do not want to use polling of ```threading.Event``` state in one of coroutines. Please check and share your opinion – Artiom Kozyrev Nov 17 '20 at 09:27
  • 1
    Great! Thanks. It looks more like what I wanted. So if the two tasks run in infinite loops, you'll need to clear the event in alert task? – minghua Nov 19 '20 at 15:42
  • @minghua Yes to to stop infinite loop in one task you have to "raise" event in another task. On the other hand ```await event.wait()``` block all code after it in the coroutine, if you do not need doing any work in the coroutine while waiting for event, you can use it though. – Artiom Kozyrev Nov 20 '20 at 08:13