2

I'm working with asynchronous programming and wrote a small wrapper class for thread-safe execution of co-routines based on some ideas from this thread here: python asyncio, how to create and cancel tasks from another thread. After some debugging, I found that it hangs when calling the Thread class's join() function (I overrode it only for testing). Thinking I made a mistake, I basically copied the code that the OP said he used and tested it to find the same issue.

His mildly altered code:

import threading
import asyncio
from concurrent.futures import Future
import functools

class EventLoopOwner(threading.Thread):
    class __Properties:
        def __init__(self, loop, thread, evt_start):
            self.loop = loop
            self.thread = thread
            self.evt_start = evt_start

    def __init__(self):
        threading.Thread.__init__(self)
        self.__elo = self.__Properties(None, None, threading.Event())

    def run(self):
        self.__elo.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.__elo.loop)
        self.__elo.thread = threading.current_thread()
        self.__elo.loop.call_soon_threadsafe(self.__elo.evt_start.set)
        self.__elo.loop.run_forever()

    def stop(self):
        self.__elo.loop.call_soon_threadsafe(self.__elo.loop.stop)

    def _add_task(self, future, coro):
        task = self.__elo.loop.create_task(coro)
        future.set_result(task)

    def add_task(self, coro):
        self.__elo.evt_start.wait()
        future = Future()
        p = functools.partial(self._add_task, future, coro)
        self.__elo.loop.call_soon_threadsafe(p)
        return future.result()  # block until result is available

    def cancel(self, task):
        self.__elo.loop.call_soon_threadsafe(task.cancel)

async def foo(i):
    return 2 * i

async def main():
    elo = EventLoopOwner()
    elo.start()

    task = elo.add_task(foo(10))
    x = await task

    print(x)
    elo.stop(); print("Stopped")
    elo.join(); print("Joined")  # note: giving it a timeout does not fix it

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    assert isinstance(loop, asyncio.AbstractEventLoop)
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

About 50% of the time when I run it, It simply stalls and says "Stopped" but not "Joined". I've done some debugging and found that it is correlated to when the Task itself sent an exception. This doesn't happen every time, but since it occurs when I'm calling threading.Thread.join(), I have to assume it is related to the destruction of the loop. What could possibly be causing this?

The exception is simply: "cannot join current thread" which tells me that the .join() is sometimes being run on the thread from which I called it and sometimes from the ELO thread.

What is happening and how can I fix it?

I'm using Python 3.5.1 for this.

Note: This is not replicated on IDE One: http://ideone.com/0LO2D9

enter image description here

Community
  • 1
  • 1
Goodies
  • 4,439
  • 3
  • 31
  • 57
  • What does "(wrapper class for) thread-safe execution of co-routines" means? What are you trying to achieve? Coroutines already supply concurrency, without threads! – Udi Dec 25 '16 at 02:19

0 Answers0