No - Python has no resources for code external from one thread to force that thread to do anything.
The only way to achieve this is to instrument the long-running code so that it checks at given intervals if it god a message telling it to be canceled - you could use a queue for that, or make an asyncio loop on the other thread, and call await asyncio.sleep(0)
there - - then get a reference to the running task on your main thread, and canceling it. But I suppose that, if this was an option, you'd just have kept it as a co-routine in the main thread.
So, the next thing that can be done is to set it up in an external process instead, through multiprocessing
instead of threading
, or a ProcessPoolExecutor
- the starting overhead is greater, also data communication, as all arguments have to be serialized - but then you have the option from the main process to kill the subprocess through a signal (.kill
method for a process handled through the subprocess
module, or just plain os.kill
). The code example at the end of this answer implements a nice class to wrap a call performing this.
You can then, in the target process register a signal handler to terminate it nicely (perhaps returning the data partially processed)
When checking the docs on this - https://docs.python.org/3/library/signal.html you will even notice that there is a signal.pthread_kill
call (on Unix), which can sent a signal and interrupt a thread - but the signal handling will always take place on the main Python thread - it is possible to see the running frames in the target thread - but you can't interrupt them as well, your signal handler could either exit the program, or allow execution to continue from the point where it is in the interrupted thread.
In Python 3.12 there will be the possibility to use sub-interpreters from Python code (a provisional implementation of PEP 554 is already on Python development branch), it will then be possible to run the code in a thread in another Python interpreter instead - then a call to destroy the interpreter could destroy it at the desired moment, with no need for a subprocess.
Now, here follows some sample code that allows one to call function in a subprocess, wrapped in a cancellable Future -
not suitable for a lot of little tasks, since it will use one sub-process per task, and not a process pool -
Nonetheless it will work as asked, and has the boilerplate needed to get a return value from the function run in the subprocess - which is not a raw-call would not feature.
(I wrote the Future subclass to experiment with the signal.pthread_kill
call above before concluding it was not suitable for this case- then I decided polish it as a wrapper for a sub-process call. Also, I will likely be using this in extrainterpreters
- a library I am working on to allow a practical use of Python sub-interpreters)
import multiprocessing
import asyncio
import time
import queue as threading_queue
resolution = 0.02
class CancellableProcess(asyncio.Future):
# We inherit from Future - but accept a completly different
# set of arguments - Liskov would not like it - but this is "real world"
def __init__(self, target_func, args=(), kwargs=None, *, loop=None):
if kwargs is None:
kwargs = {}
if loop is None:
loop = asyncio.get_running_loop()
super().__init__(loop=loop)
self.func = target_func
self.args = args
self.kwargs = kwargs
self.process_runner()
@staticmethod
def in_process_runner(queue, func, args, kwargs):
result = func(*args, **kwargs)
queue.put(result)
def process_runner(self):
self.queue = multiprocessing.Queue()
self.process = multiprocessing.Process(target = self.in_process_runner, args=(self.queue, self.func, self.args, self.kwargs))
self.process.start()
self._loop.call_later(resolution, self.get_remote_result)
def get_remote_result(self):
try:
result = self.queue.get(block=False)
except threading_queue.Empty:
if self._state == asyncio.futures._PENDING:
self._loop.call_later(resolution, self.get_remote_result)
return
if self._state == asyncio.futures._PENDING:
self.set_result(result)
def cancel(self, *args):
self.process.terminate()
super().cancel(*args)
#--------------------
# "result" is a global variable featuring a value that is set
# in the subprocess. It is not needed for things to work, just
# another example of data-exchanging with the subprocess:
result = multiprocessing.Value("i")
def blocking_task():
# Final "result" value if the task
# is cancelled during the sleep call bellow:
result.value = 23
time.sleep(0.5)
# final value if cancelling the task
# fails to stop the process:
result.value = 42
# Returned value if task does not timeout:
return 55
async def long_running_task():
f= await CancellableProcess(blocking_task)
return f
async def main():
try:
async with asyncio.timeout(0.25):
print(await long_running_task())
except TimeoutError:
print("The long operation timed out, but we've handled it.")
print("This statement will run regardless.")
time.sleep(1) # wait for process to complete
print(f"result = {result.value}. Process was{'' if result.value == 23 else ' not'} cancelled correctly")
# Guard "if" needed due to the default
# method of starting sub-processes in Windows and MacOS:
if __name__ == "__main__":
asyncio.run(main())