Imagine an async
scenario with a function calling a blocking read()
in a loop which for that reason gets executed in a ThreadPoolExecutor
:
def read_from_io(loop, event_queue):
while True:
event = some_io_resource.read()
asyncio.run_coroutine_threadsafe(event_queue.put(event), loop=loop)
event_queue = asyncio.Queue()
asyncio.set_event_loop(loop := asyncio.new_event_loop())
with concurrent.futures.ThreadPoolExecutor() as pool:
reader_task = loop.run_in_executor(pool, read_from_io, loop, event_queue)
try:
loop.run_forever()
except KeyboardInterrupt:
#
# how to signal reader_task to stop?
Is there a clean way to stop this task? From what I read in tutorials and documentation neither reader_task.cancel()
nor pool.shutdown()
has an effect in this (threaded) scenario.
The next common answer you find involves a threading.Event()
being fired in the main thread - but that doesn't work with the blocking function in read_from_io
, right?
The only generic way to terminate a blocking function I came up with was to "send" an exception to that thread, like described here or here. But that looks ugly:
def raise_exception(thread, exception_type):
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
thread.get_id(),
ctypes.py_object(exception_type))
if res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.get_id(), 0)
print('Exception raise failure')
With Python3.11+, asyncio
, concurrent
and stuff - is there a more elegant way to stop a blocking task created for exactly that reason?