1

The scenario is that I have sync code executing in a couroutine that I don't have control over. At a given time, I'd like to be able to "force cancel" that coroutine. I'm running the sync code in a ThreadPoolExecutor and trying to cancel the task. For example:

def execute():
    # Some sync code
    for i in range(5):
        print(f'Hello {i}')
        time.sleep(1)

    print('Done executing sync task')


async def main(executor):
    loop = asyncio.get_running_loop()
    t = loop.run_in_executor(executor, execute)

    # Simulate a cancel request coming in
    await asyncio.sleep(1)
    print('Cancelling')
    t.cancel()
    print('Cancelled')

    await t
    

executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
asyncio.run(main(executor), debug=True)
    

The output is as follows:

Hello 0
Hello 1
Cancelling
... asyncio.exceptions.CancelledError stack trace ...
Cancelled
Hello 2
Hello 3
Hello 4

Ideally, I'd like to somehow terminate the running task. Is this possible? I'm also not married to using asyncio if something else if more appropriate here. Thanks for the suggestions.

naivedeveloper
  • 2,814
  • 8
  • 34
  • 43
  • Cancel only cancels the awaitable that's awaiting the result of the function, which is executing in another thread. Essentially what you are asking for is how to kill a running thread, because that's what you have to do. See this answer https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread. Your situation is slightly worse thanks to the presence of asyncio, but still the best answer is that "you probably shouldn't do that." – Paul Cornelius Mar 23 '23 at 10:50

1 Answers1

0

The design of ThreadPoolExecutor doesn't allow to stop task that have already started running in the pool (albeit pending tasks, which are still waiting for their execution turn can be killed).

In fact loop.run_in_executor inside its implementation just submits task into pool and wraps the resulting concurrent.futures.Future object by asyncio.futures.Future so that when the first one completes, so does the second. Since concurrent.futures.Future doesn't provide an opportunity for cancelling running tasks (see doc), a wrapper future has the same limitation as well.

In theory you can try to find some other thread pool implementations, which support "force cancel" for futures and tasks inside the pool, but this third-party implementation should be inherited from concurrent.futures.Future because of the following check in asyncio source code.

Taking into account all of the above, may be it's easier to support the opportunity for "graceful"-shutdown inside the sync code (for example by checking a special flag that func should terminate):

from threading import Event

def execute(stop: Event):
    # Some sync code
    for i in range(5):
        if stop.is_set():
            break
        print(f'Hello {i}')
        time.sleep(1)

    print('Done executing sync task')


async def main(executor):
    loop = asyncio.get_running_loop()

    stop = Event()
    t = loop.run_in_executor(executor, execute, stop)

    # Simulate a cancel request coming in
    await asyncio.sleep(1)
    print('Cancelling')
    stop.set()
    print('Cancelled')

    await t
    

executor = ThreadPoolExecutor(max_workers=2)
asyncio.run(main(executor), debug=True)

P.S. It turns out that the described external implementation of a thread pool exists. Try to replace ThreadPoolExecutor on ThreadPool from the module pebble. It should work for your code. But to be honest I have never used this lib in production...