I was looking for a way to spawn different threads (in my actual program the number of threads can change during execution) to perform a endless-running operation which would block my whole application for (at worst) a couple of seconds during their run.
Because of this, I'm using the standard thread class and asyncio
(because other parts of my program are using it).
This seems to work good and according to this thread it seems to be okay, however when searching for asynchronous threading and asyncio I'm often stumbling across the suggestion of using ProcessPoolExecutor
(e. g. in this stackoverflow post).
Now I'm wondering, if the following way is really good practice (or even dangerous)?
class Scanner:
def __init__(self):
# Start a new Scanning Thread
self.scan_thread = Thread(target=self.doScan, args=())
self.scan_thread.start()
def doScan(self):
print("Started scanning")
loop = asyncio.new_event_loop()
loop.run_until_complete(self.connection())
print("Stopped scanning")
list_of_scanner = []
list_of_scanner.append(Scanner())
list_of_scanner.append(Scanner())
Background: I started questioning this myself, because my program started crashing when spawning threads, mostly with the error message
RuntimeError: Task <Task pending ...> attached to a different loop
. I know that this is not directly linked to the example I gave you, but I guess I started messing up my asyncio coroutines by using these threads.
Edit
For clarification I want to add, why I'm using this weird construct of asyncio
and threads
.
- I'm using this parts of the project hbldh/bleak
The part which would run as a thread is basically this:async def connection(): x = await client.is_connected() async with BleakClient(address, loop=loop) as client: while x: x = await client.is_connected() log.info("Connected: {0}".format(x))
- What is
endlessScan()
doing? The name is a bit misleading and it's called different in my code (I've now changed that now). The new name isconnection()
The whole purpose is to establish a link to Bluetooth Devices and basically listen to incoming data (like we would do when using sockets) This means thatloop.run_until_complete(self.connection())
will NEVER exit, unless the Bluetooth devices disconnects. - Why can't I create one single event loop?
As said, when established a link, this function runs endlessly. Each connected device runs such an endless loop. I want to do this in background. My main application should never have to wait for the routine to finish and must be responsive under all circumstances. This for me justified the usage ofthreads
in combination withasyncio
Edit 2: Added my testing code based on @user4815162342 suggestion. The execution seems to work fine.
import asyncio
from threading import Thread, Event, Lock
import random
class Scanner:
def __init__(self, id, loop):
print("INIT'D %s" % id)
self.id = id
self.submit_async(self.update_raw_data(), loop)
self.raw_data = ""
self.event = Event()
self.data_lock = Lock()
@property
def raw_data(self):
with self.data_lock:
return self._raw_data
@raw_data.setter
def raw_data(self, raw_data):
self._raw_data = raw_data
def submit_async(self, awaitable, loop):
return asyncio.run_coroutine_threadsafe(awaitable, loop)
async def update_raw_data(self):
while True:
with self.data_lock:
self._raw_data = random.random()
print("Waken up %s with %s" % (self.id, self._raw_data))
await asyncio.sleep(self.id)
def _start_async():
loop = asyncio.new_event_loop()
t = Thread(target=loop.run_forever)
t.daemon = True
t.start()
return loop
_loop = _start_async()
def stop_async():
_loop.call_soon_threadsafe(_loop.stop)
ble_devices = [Scanner(1, _loop), Scanner(2, _loop), Scanner(4, _loop)]
# This code never executes...
for dev in ble_devices:
print(dev.raw_data)