5

I was looking for a way to spawn different threads (in my actual program the number of threads can change during execution) to perform a endless-running operation which would block my whole application for (at worst) a couple of seconds during their run.
Because of this, I'm using the standard thread class and asyncio (because other parts of my program are using it).

This seems to work good and according to this thread it seems to be okay, however when searching for asynchronous threading and asyncio I'm often stumbling across the suggestion of using ProcessPoolExecutor (e. g. in this stackoverflow post). Now I'm wondering, if the following way is really good practice (or even dangerous)?

class Scanner:
  def __init__(self):
    # Start a new Scanning Thread
    self.scan_thread = Thread(target=self.doScan, args=())
    self.scan_thread.start()

  def doScan(self):
    print("Started scanning")
    loop = asyncio.new_event_loop()
    loop.run_until_complete(self.connection())
    print("Stopped scanning")

list_of_scanner = []
list_of_scanner.append(Scanner())
list_of_scanner.append(Scanner())

Background: I started questioning this myself, because my program started crashing when spawning threads, mostly with the error message RuntimeError: Task <Task pending ...> attached to a different loop. I know that this is not directly linked to the example I gave you, but I guess I started messing up my asyncio coroutines by using these threads.


Edit

For clarification I want to add, why I'm using this weird construct of asyncio and threads.

  1. I'm using this parts of the project hbldh/bleak
    The part which would run as a thread is basically this:
    async def connection():
      x = await client.is_connected()
      async with BleakClient(address, loop=loop) as client:
        while x:
          x = await client.is_connected()
          log.info("Connected: {0}".format(x))
    
  2. What is endlessScan() doing? The name is a bit misleading and it's called different in my code (I've now changed that now). The new name is connection()
    The whole purpose is to establish a link to Bluetooth Devices and basically listen to incoming data (like we would do when using sockets) This means that loop.run_until_complete(self.connection()) will NEVER exit, unless the Bluetooth devices disconnects.
  3. Why can't I create one single event loop?
    As said, when established a link, this function runs endlessly. Each connected device runs such an endless loop. I want to do this in background. My main application should never have to wait for the routine to finish and must be responsive under all circumstances. This for me justified the usage of threads in combination with asyncio

Edit 2: Added my testing code based on @user4815162342 suggestion. The execution seems to work fine.

    import asyncio
    from threading import Thread, Event, Lock
    import random
    class Scanner:
        def __init__(self, id, loop):
            print("INIT'D %s" % id)
            self.id = id
            self.submit_async(self.update_raw_data(), loop)
            self.raw_data = ""
            self.event = Event()
            self.data_lock = Lock()
    
        @property
        def raw_data(self):
            with self.data_lock:
                return self._raw_data
        @raw_data.setter
        def raw_data(self, raw_data):
            self._raw_data = raw_data
    
        def submit_async(self, awaitable, loop):
            return asyncio.run_coroutine_threadsafe(awaitable, loop)
    
        async def update_raw_data(self):
            while True:
                with self.data_lock:
                    self._raw_data = random.random()
                    print("Waken up %s with %s" % (self.id, self._raw_data))
                await asyncio.sleep(self.id)
    
    def _start_async():
        loop = asyncio.new_event_loop()
         t = Thread(target=loop.run_forever)
         t.daemon = True
         t.start()
        return loop
    _loop = _start_async()
    
    def stop_async():
        _loop.call_soon_threadsafe(_loop.stop)
    
    ble_devices = [Scanner(1, _loop), Scanner(2, _loop), Scanner(4, _loop)]
    
    # This code never executes...
    for dev in ble_devices:
        print(dev.raw_data)

agentsmith
  • 1,226
  • 1
  • 14
  • 27
  • Your use of asyncio is not idiomatic, and possibly also incorrect, depending on what `endlessScan` actually does. You should create a **single** event loop and create tasks in it using `asyncio.create_task()`. If the rest of your application already uses asyncio, that will be all you need. If you also have non-asyncio threads and you need them to add more scanners, you can use `asyncio.run_coroutine_threadsafe()` to submit additional tasks to a running loop. – user4815162342 Sep 12 '20 at 12:13
  • Thanks for your advice. I understand what you want to say. May application does not really use `asyncio`, only parts of an external project (see my updated answer) I'm using do. Because of that, I am running the coroutine in a thread. May `Main` and also other parts are not running in a loop. I've added more information, I hope it makes my question a little bit clearer. – agentsmith Sep 12 '20 at 15:31
  • Your argument against creating a single event loop is based on a false premise. Yes, you should use threads, but you can still make a single event loop and use it for multiple endless coroutines. That way you'll use asyncio the way it was designed to be used, you'll spend less resources, and you'll reduce possibilities for weird multithreading issues. See my answer for detalis. – user4815162342 Sep 12 '20 at 16:20

1 Answers1

9

I would recommend creating a single event loop in a background thread and have it service all your async needs. It doesn't matter that your coroutines never end; asyncio is perfectly capable of executing multiple such functions in parallel.

For example:

def _start_async():
    loop = asyncio.new_event_loop()
    threading.Thread(target=loop.run_forever).start()
    return loop

_loop = start_async()

# Submits awaitable to the event loop, but *doesn't* wait for it to
# complete. Returns a concurrent.futures.Future which *may* be used to
# wait for and retrieve the result (or exception, if one was raised)
def submit_async(awaitable):
    return asyncio.run_coroutine_threadsafe(awaitable, _loop)

def stop_async():
    _loop.call_soon_threadsafe(_loop.stop)

With these tools in place (and possibly in a separate module), you can do things like this:

class Scanner:
    def __init__(self):
        submit_async(self.connection())
        # ...

    # ...
  • What about the advice to use ProcessPoolExecutor?

Those apply to running CPU-bound code in parallel processes to avoid the GIL. If you are actually running async code, you shouldn't care about ProcessPoolExecutor.

  • What about the advice to use ThreadPoolExecutor?

A ThreadPoolExecutor is simply a thread pool useful for classic multi-threaded applications. In Python it is used primarily to make the program more responsive, not to make it faster. It allows you to run CPU-bound or blocking code in parallel with interactive code with neither getting starved. It won't make things faster due to the GIL.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • Thanks for this snippet and your effort! Appreciate it! I have seen my mistake now. This seems to work perfectly! – agentsmith Sep 13 '20 at 11:35