44

How can I asynchronously insert tasks to run in an asyncio event loop running in another thread?

My motivation is to support interactive asynchronous workloads in the interpreter. I can't block the main REPL thread.

Example

My current flawed understanding says that the following should work. Why doesn't it? What is a better way to accomplish goal above?

import asyncio
from threading import Thread

loop = asyncio.new_event_loop()

def f(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

t = Thread(target=f, args=(loop,))
t.start()    

@asyncio.coroutine
def g():
    yield from asyncio.sleep(1)
    print('Hello, world!')

asyncio.async(g(), loop=loop)
MRocklin
  • 55,641
  • 23
  • 163
  • 235

2 Answers2

29

You must use call_soon_threadsafe to schedule callbacks from different threads:

import asyncio
from threading import Thread

loop = asyncio.new_event_loop()

def f(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

t = Thread(target=f, args=(loop,))
t.start()    

@asyncio.coroutine
def g():
    yield from asyncio.sleep(1)
    print('Hello, world!')

loop.call_soon_threadsafe(asyncio.async, g())

See https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading for more information.

EDIT: Example of an interpreter supporting asynchronous workloads

# vim: filetype=python3 tabstop=2 expandtab

import asyncio as aio
import random

@aio.coroutine
def async_eval(input_, sec):
  yield from aio.sleep(sec)
  print("")
  try:
    result = eval(input_)
  except Exception as e:
    print("< {!r} does not compute >".format(input_))
  else:  
    print("< {!r} = {} >".format(input_, result))

@aio.coroutine
def main(loop):
  while True:
    input_ = yield from loop.run_in_executor(None, input, "> ")

    if input_ == "quit":
      break
    elif input_ == "":
      continue
    else:
      sec = random.uniform(5, 10)
      print("< {!r} scheduled for execution in {:.02} sec>".format(input_, sec))
      aio.async(async_eval(input_, sec))

loop = aio.get_event_loop()

loop.run_until_complete(main(loop))
loop.close()
Jashandeep Sohi
  • 4,903
  • 2
  • 23
  • 25
  • Nice. How about if I wanted to get the returned result of `g()`? (if `g` actually returned something.) – MRocklin Aug 19 '15 at 23:34
  • You could pass a Future into `g` and set it's result from within `g` and then you could `yield from that_future` in another event loop in the other thread. You could also just create another coroutine in which you `yield from g()` and then `call_soon_threadsafe` on that new coroutine. – Jashandeep Sohi Aug 20 '15 at 06:18
  • Ideally, I would just re-work the logic to do this without threads. Or even if you must use threads, try to use the `loop.run_in_executor` coroutine, which can call blocking functions in threads and still look `asyncio`ish. See my edit for my interpretation of `support interactive asynchronous workloads in the interpreter` using this approach. – Jashandeep Sohi Aug 20 '15 at 06:29
  • 7
    if you need to run a coroutine on the event loop in the other thread use: ```asyncio.run_coroutine_threadsafe(my_coro(param1), loop)``` – Scott P. Oct 05 '18 at 14:59
  • Excuse my ignorance but, I don't get where we make the reference to the started thread. Correct me if I'm wrong: we start an event-loop in another thread (call it thread1). We return to the main thread (call it thread0) and run an event loop in thread0 with a coroutine or callback to be run in that thread0 (I don't see any reference to thread1 in the assigning of the coroutine). In this case, we are not starting both event loops, each in one different thread and sending a task from one thread to another (from thread0 to thread1 for example), ¿aren't we? ¿How could that be accomplished? – Ezarate11 Feb 22 '19 at 22:28
  • `loop = asyncio.new_event_loop() def f(loop): asyncio.set_event_loop(loop) loop.run_forever()` this is very confusing. Don't use the same variable name for the module level variable.... – MrR Apr 03 '19 at 12:59
  • 3
    What is the valid syntax for 3.7- I'm hitting this too – PhilBot Oct 18 '19 at 15:20
  • 1
    @PhilBot Hopefully you aren't still waiting for an answer but I created a 3.7+ example in a separate answer. – JimmyJames Jan 18 '21 at 19:02
  • Considering this contains `loop = asyncio.new_event_loop()`, is this really adding a task to the existing loop in the other thread? Or is it rather creating a new loop in the main thread? – Alexis.Rolland Sep 13 '21 at 07:47
10

The first example in Jashandeep Sohi's answer does not work for me in 3.7+ and prints warnings about the deprecated annotation. I reworked this into a something that runs under 3.8. I tweaked it a little to meet my needs as well. I am new to multi-threading in Python (but not multithreading in general) so any advice, guidance, etc. is appreciated:

import asyncio
from threading import Thread


loop = asyncio.new_event_loop()
running = True


def evaluate(future):
    global running
    stop = future.result()
    if stop:
        print("press enter to exit...")
        running = False


def side_thread(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


thread = Thread(target=side_thread, args=(loop,), daemon=True)
thread.start()


async def display(text):
    await asyncio.sleep(5)
    print("echo:", text)
    return text == "exit"


while running:
  text = input("enter text: ")
  future = asyncio.run_coroutine_threadsafe(display(text), loop)
  future.add_done_callback(evaluate)


print("exiting")

The echo and other output will conflict with the prompts but it should be good enough to demonstrate it is working.

One thing I am unsure about is setting the global running from one thread and reading it from another. I think maybe the GIL synchronizes the thread cache but I'd love to get confirmation (or not) about that.

JimmyJames
  • 1,356
  • 1
  • 12
  • 24