1

I was trying to add a timeout function using asyncio, I find out that asyncio's timeout doesn't work on blocking code, I decide to run the blocking code in a seperate thread using asyncio.to_thread, but then I found out that the timeout functionality in asyncio can't force the thread to stop, here's a simplified example of my problem:

import asyncio

def blocking_task():
    input()

async def long_running_task():
    await asyncio.to_thread(blocking_task)

async def main():
    try:
        async with asyncio.timeout(1):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

asyncio.run(main())

The TimeoutError exception is thrown but the execution doesn't stop, it just hangs there.
I was wondering if anyone ever managed to make it work, and force to kill the thread or force it to stop working on the long_running_task ? Is it possible to do this ? PS: I checked most of the other stackoverflow questions related to this and most don't give a valid solution.

  • Does this answer your question? [asyncio: Is it possible to cancel a future been run by an Executor?](https://stackoverflow.com/questions/26413613/asyncio-is-it-possible-to-cancel-a-future-been-run-by-an-executor) Note that `to_thread` simply calls `run_in_executor` under the hood. – Daniil Fajnberg May 15 '23 at 12:45
  • Hey @DaniilFajnberg, thank you for your suggestion. Sadly no, this doesn't work in my case because this part `f event.is_set()` requires a running loop to check if the timeout was reached and cancelling the future doesn't stop the thread. I have multiple classes like Algorithm1, Algorithm2, Algorithm3....AlgorithmN. I have a method called solve, solve might contain blocking code, which is why I have to call it in `asyncio.to_thread(algorithm1.solve)`, I can't go into every one of the `solve` method implementation and add a loop and check if the event is set, that's not so scalable. – TheKingOfRandom May 15 '23 at 13:13
  • I am currently looking for a way to kill the thread all together when we reach the timeout – TheKingOfRandom May 15 '23 at 13:15

1 Answers1

1

No - Python has no resources for code external from one thread to force that thread to do anything.

The only way to achieve this is to instrument the long-running code so that it checks at given intervals if it god a message telling it to be canceled - you could use a queue for that, or make an asyncio loop on the other thread, and call await asyncio.sleep(0) there - - then get a reference to the running task on your main thread, and canceling it. But I suppose that, if this was an option, you'd just have kept it as a co-routine in the main thread.

So, the next thing that can be done is to set it up in an external process instead, through multiprocessing instead of threading, or a ProcessPoolExecutor - the starting overhead is greater, also data communication, as all arguments have to be serialized - but then you have the option from the main process to kill the subprocess through a signal (.kill method for a process handled through the subprocess module, or just plain os.kill). The code example at the end of this answer implements a nice class to wrap a call performing this.

You can then, in the target process register a signal handler to terminate it nicely (perhaps returning the data partially processed)

When checking the docs on this - https://docs.python.org/3/library/signal.html you will even notice that there is a signal.pthread_kill call (on Unix), which can sent a signal and interrupt a thread - but the signal handling will always take place on the main Python thread - it is possible to see the running frames in the target thread - but you can't interrupt them as well, your signal handler could either exit the program, or allow execution to continue from the point where it is in the interrupted thread.

In Python 3.12 there will be the possibility to use sub-interpreters from Python code (a provisional implementation of PEP 554 is already on Python development branch), it will then be possible to run the code in a thread in another Python interpreter instead - then a call to destroy the interpreter could destroy it at the desired moment, with no need for a subprocess.

Now, here follows some sample code that allows one to call function in a subprocess, wrapped in a cancellable Future - not suitable for a lot of little tasks, since it will use one sub-process per task, and not a process pool - Nonetheless it will work as asked, and has the boilerplate needed to get a return value from the function run in the subprocess - which is not a raw-call would not feature. (I wrote the Future subclass to experiment with the signal.pthread_kill call above before concluding it was not suitable for this case- then I decided polish it as a wrapper for a sub-process call. Also, I will likely be using this in extrainterpreters - a library I am working on to allow a practical use of Python sub-interpreters)

import multiprocessing
import asyncio
import time
import queue as threading_queue

resolution = 0.02



class CancellableProcess(asyncio.Future):
    # We inherit from Future - but accept a completly different
    # set of arguments - Liskov would not like it - but this is "real world"
    def __init__(self, target_func, args=(), kwargs=None, *, loop=None):
        if kwargs is None:
            kwargs = {}

        if loop is None:
            loop = asyncio.get_running_loop()
        super().__init__(loop=loop)
        self.func = target_func
        self.args = args
        self.kwargs = kwargs
        self.process_runner()

    @staticmethod
    def in_process_runner(queue, func, args, kwargs):
        result = func(*args, **kwargs)
        queue.put(result)

    def process_runner(self):
        self.queue = multiprocessing.Queue()
        self.process = multiprocessing.Process(target = self.in_process_runner, args=(self.queue, self.func, self.args, self.kwargs))
        self.process.start()
        self._loop.call_later(resolution, self.get_remote_result)

    def get_remote_result(self):
        try:
            result = self.queue.get(block=False)
        except threading_queue.Empty:
            if self._state == asyncio.futures._PENDING:
                self._loop.call_later(resolution, self.get_remote_result)
            return
        if self._state == asyncio.futures._PENDING:
            self.set_result(result)

    def cancel(self, *args):
        self.process.terminate()
        super().cancel(*args)

#--------------------

# "result" is a global variable featuring a value that is set
# in the subprocess. It is not needed for things to work, just
# another example of data-exchanging with the subprocess:
result = multiprocessing.Value("i")

def blocking_task():
    # Final "result" value if the task
    # is cancelled during the sleep call bellow:
    result.value = 23
    time.sleep(0.5)
    # final value if cancelling the task
    # fails to stop the process:
    result.value = 42
    # Returned value if task does not timeout:
    return 55

async def long_running_task():
    f= await CancellableProcess(blocking_task)
    return f

async def main():
    try:
        async with asyncio.timeout(0.25):
            print(await long_running_task())
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")
    time.sleep(1)  # wait for process to complete
    print(f"result = {result.value}. Process was{'' if result.value == 23 else ' not'} cancelled correctly")

# Guard "if" needed due to the default
# method of starting sub-processes in Windows and MacOS:
if __name__ == "__main__":
    asyncio.run(main())


marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
jsbueno
  • 99,910
  • 10
  • 151
  • 209
  • Hey @jsbueno, this is a great solution, and it works in most cases, except when your code contains a lot of lambda functions and the multiprocessing library can't serialize those functions – TheKingOfRandom May 17 '23 at 13:36
  • Yes. Does yours? I know "dill", a 3rdy party package can pickle usually unpickable things - so, I jsut made a quick search, there is a multiprocessing fork using dill, named Pathos - - check if it will fit your needs: https://stackoverflow.com/questions/19984152/what-can-multiprocessing-and-dill-do-together – jsbueno May 17 '23 at 13:43