32

I have a python multi-threaded application. I want to run an asyncio loop in a thread and post calbacks and coroutines to it from another thread. Should be easy but I cannot get my head around the asyncio stuff.

I came up to the following solution which does half of what I want, feel free to comment on anything:

import asyncio
from threading import Thread

class B(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.loop = None

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop) #why do I need that??
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def add_task(self, coro):
        """this method should return a task object, that I
          can cancel, not a handle"""
        f = functools.partial(self.loop.create_task, coro)
        return self.loop.call_soon_threadsafe(f)

    def cancel_task(self, xx):
        #no idea

@asyncio.coroutine
def test():
    while True:
        print("running")
        yield from asyncio.sleep(1)

b.start()
time.sleep(1) #need to wait for loop to start
t = b.add_task(test())
time.sleep(10)
#here the program runs fine but how can I cancel the task?

b.stop()

So starting and stoping the loop works fine. I thought about creating task using create_task, but that method is not threadsafe so I wrapped it in call_soon_threadsafe. But I would like to be able to get the task object in order to be able to cancel the task. I could do a complicated stuff using Future and Condition, but there must be a simplier way, isnt'it?

songololo
  • 4,724
  • 5
  • 35
  • 49
Olivier RD
  • 679
  • 1
  • 5
  • 11

4 Answers4

21

I think you may need to make your add_task method aware of whether or not its being called from a thread other than the event loop's. That way, if it's being called from the same thread, you can just call asyncio.async directly, otherwise, it can do some extra work to pass the task from the loop's thread to the calling thread. Here's an example:

import time
import asyncio
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future

class B(Thread):
    def __init__(self, start_event):
        Thread.__init__(self)
        self.loop = None
        self.tid = None
        self.event = start_event

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.tid = current_thread()
        self.loop.call_soon(self.event.set)
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def add_task(self, coro):
        """this method should return a task object, that I
          can cancel, not a handle"""
        def _async_add(func, fut):
            try:
                ret = func()
                fut.set_result(ret)
            except Exception as e:
                fut.set_exception(e)

        f = functools.partial(asyncio.async, coro, loop=self.loop)
        if current_thread() == self.tid:
            return f() # We can call directly if we're not going between threads.
        else:
            # We're in a non-event loop thread so we use a Future
            # to get the task from the event loop thread once
            # it's ready.
            fut = Future()
            self.loop.call_soon_threadsafe(_async_add, f, fut)
            return fut.result()

    def cancel_task(self, task):
        self.loop.call_soon_threadsafe(task.cancel)


@asyncio.coroutine
def test():
    while True:
        print("running")
        yield from asyncio.sleep(1)

event = Event()
b = B(event)
b.start()
event.wait() # Let the loop's thread signal us, rather than sleeping
t = b.add_task(test()) # This is a real task
time.sleep(10)
b.stop()

First, we save the thread id of the event loop in the run method, so we can figure out if calls to add_task are coming from other threads later. If add_task is called from a non-event loop thread, we use call_soon_threadsafe to call a function that will both schedule the coroutine, and then use a concurrent.futures.Future to pass the task back to the calling thread, which waits on the result of the Future.

A note on cancelling a task: You when you call cancel on a Task, a CancelledError will be raised in the coroutine the next time the event loop runs. This means that the coroutine that the Task is wrapping will aborted due to the exception the next time it hit a yield point - unless the coroutine catches the CancelledError and prevents itself from aborting. Also note that this only works if the function being wrapped is actually an interruptible coroutine; an asyncio.Future returned by BaseEventLoop.run_in_executor, for example, can't really be cancelled, because it's actually wrapped around a concurrent.futures.Future, and those can't be cancelled once their underlying function actually starts executing. In those cases, the asyncio.Future will say its cancelled, but the function actually running in the executor will continue to run.

Edit: Updated the first example to use concurrent.futures.Future, instead of a queue.Queue, per Andrew Svetlov's suggestion.

Note: asyncio.async is deprecated since version 3.4.4 use asyncio.ensure_future instead.

dano
  • 91,354
  • 19
  • 222
  • 219
  • Thanks for the example it helped me fix several issues I had. Btw I also hadd to instanciate Future with Future(loop=self.loop), otherwise in some cases future would take wrong loop – Olivier RD Mar 27 '15 at 20:43
  • @OlivierRD You should be using `concurrent.futures.Future`, not `asyncio.Future`. `concurrent.futures.Future` doesn't take a `loop` keyword arugment. – dano Mar 27 '15 at 20:44
  • the documentation seems to say that it does: https://docs.python.org/3/library/asyncio-task.html#asyncio.Future – Olivier RD Mar 27 '15 at 21:02
  • btw task.cancel really seems to cancel the running task. I just ran a few tests. the task seems to stop at the first yield statement – Olivier RD Mar 27 '15 at 21:05
  • @OlivierRD That's the docs for `asyncio.Future`, not [`concurrent.futures.Future`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future) – dano Mar 27 '15 at 21:10
  • @OlivierRD re: `Task.cancel`, yes, I was thinking of `concurrent.futures.Future` when I said that. According to do docs for [`asyncio.Task.cancel`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel), it injects a `CancelledError` into the coroutine the next time the event loop gets a chance to run, which lines up with what you're seeing. As long as the coroutine doesn't catch the `CancelledError`, it should abort as soon as it hits its next yield point. – dano Mar 27 '15 at 21:16
  • @OlivierRD I updated my answer to be more accurate about task cancelling. Thanks for correcting me! – dano Mar 27 '15 at 21:30
  • This triggers a `Task was destroyed but it is pending!` warning. – Matthias Urlichs Oct 07 '15 at 03:01
7

You do everything right. For task stopping make method

class B(Thread):
    # ...
    def cancel(self, task):
        self.loop.call_soon_threadsafe(task.cancel)

BTW you have to setup an event loop for the created thread explicitly by

self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

because asyncio creates implicit event loop only for main thread.

songololo
  • 4,724
  • 5
  • 35
  • 49
Andrew Svetlov
  • 16,730
  • 8
  • 66
  • 69
  • The missing piece here is how to get the handle to the `task` in the first place. Because the OP needs to use `call_soon_threadsafe(self.loop.create_task)` in the `add_task` method, he doesn't actually have a handle to the task after adding it to the loop. – dano Mar 27 '15 at 13:39
  • 1
    Got it. You are right. @dano BTW you may use concurrent.futures.Future instead of Queue in your answer. I think it's cleaner. – Andrew Svetlov Mar 27 '15 at 16:32
  • Yes, I agree that using a `Future` is nicer than a `Queue`. I've updated my answer to reflect that. Thanks! – dano Mar 27 '15 at 17:21
6

just for reference here it the code I finally implemented based on the the help I got on this site, it is simpler since I did not need all features. thanks again!

import asyncio
from threading import Thread
from concurrent.futures import Future
import functools

class B(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.loop = None

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def _add_task(self, future, coro):
        task = self.loop.create_task(coro)
        future.set_result(task)

    def add_task(self, coro):
        future = Future()
        p = functools.partial(self._add_task, future, coro)
        self.loop.call_soon_threadsafe(p)
        return future.result() #block until result is available

    def cancel(self, task):
        self.loop.call_soon_threadsafe(task.cancel)
Adam Parkin
  • 17,891
  • 17
  • 66
  • 87
Olivier RD
  • 679
  • 1
  • 5
  • 11
  • Does this still work with the async/await coroutines in Python 3.5? The docs for future.result() don't seem to indicate that result() blocks (rather times out, see https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.result ), and in the caller of add_task the value I get back appears to be a Task rather than the concrete value returned from the coroutine. Furthermore https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.set_result seems to indicate that set_result shouldn't be used – Adam Parkin Aug 01 '18 at 22:25
  • Gist of what I tried with this: https://gist.github.com/pzelnip/7230b32dc9a27f6e78d9cd78b619245a The return from add_task when the cororoutine appears to be a Task, and also never appears to terminate. – Adam Parkin Aug 01 '18 at 22:27
4

Since version 3.4.4 asyncio provides a function called run_coroutine_threadsafe to submit a coroutine object from a thread to an event loop. It returns a concurrent.futures.Future to access the result or cancel the task.

Using your example:

@asyncio.coroutine
def test(loop):
    try:
        while True:
            print("Running")
            yield from asyncio.sleep(1, loop=loop)
    except asyncio.CancelledError:
        print("Cancelled")
        loop.stop()
        raise

loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever)
future = asyncio.run_coroutine_threadsafe(test(loop), loop)

thread.start()
time.sleep(5)
future.cancel()
thread.join()
Vincent
  • 12,919
  • 1
  • 42
  • 64
  • To preventing from a race condition or deadlock, don't call `future.cancel()` directly. Use `loop.call_soon_threadsafe(future.cancel)` instead. See [here](https://docs.python.org/3.4/library/asyncio-dev.html#concurrency-and-multithreading). – Johann Chang Apr 19 '16 at 18:45
  • 5
    @ChangYu-heng This is true for [asyncio.Future](https://docs.python.org/3.4/library/asyncio-task.html#asyncio.Future) futures, but [run_coroutine_threadsafe](https://docs.python.org/3.4/library/asyncio-task.html#asyncio.run_coroutine_threadsafe) returns a [concurrent.futures.Future](https://docs.python.org/3.4/library/concurrent.futures.html#concurrent.futures.Future) which is thread-safe and doesn't depend on any event loop. – Vincent Apr 19 '16 at 19:07
  • @Vicent Sorry I didn't read the original question carefully. So an additional comment for that would be: use `loop.call_soon_threadsafe(future.cancel)` if you are going to execute `future.cancel()` from the thread which is not the event loop living in. – Johann Chang Apr 19 '16 at 19:17