0

Imagine an async scenario with a function calling a blocking read() in a loop which for that reason gets executed in a ThreadPoolExecutor:

def read_from_io(loop, event_queue):
    while True:
        event = some_io_resource.read()
        asyncio.run_coroutine_threadsafe(event_queue.put(event), loop=loop)

event_queue = asyncio.Queue()
asyncio.set_event_loop(loop := asyncio.new_event_loop())

with concurrent.futures.ThreadPoolExecutor() as pool:
    reader_task = loop.run_in_executor(pool, read_from_io, loop, event_queue)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        #
        # how to signal reader_task to stop?

Is there a clean way to stop this task? From what I read in tutorials and documentation neither reader_task.cancel() nor pool.shutdown() has an effect in this (threaded) scenario. The next common answer you find involves a threading.Event() being fired in the main thread - but that doesn't work with the blocking function in read_from_io, right?

The only generic way to terminate a blocking function I came up with was to "send" an exception to that thread, like described here or here. But that looks ugly:

def raise_exception(thread, exception_type):
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        thread.get_id(),
        ctypes.py_object(exception_type))
    if res > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.get_id(), 0)
        print('Exception raise failure')

With Python3.11+, asyncio, concurrent and stuff - is there a more elegant way to stop a blocking task created for exactly that reason?

frans
  • 8,868
  • 11
  • 58
  • 132

0 Answers0