A future object has been initialized here:
future = executor.submit(self.cancel_task)
Then, we run the long task:
result = some_very_long_task()
This call to some_very_long_task()
will run to completion first before moving on to the next lines. Whether the future object raised an exception or not at this point, it wouldn't affect the current execution. Actually as documented, you have to explicitly call Future.result() to re-raise any exception that happened in the submitted process (here is self.cancel_task
).
class concurrent.futures.Future
result(timeout=None)
If the call raised an exception, this method will raise the same exception.
So even if you call it:
future = executor.submit(self.cancel_task)
result = some_very_long_task()
future.result()
It will only run and re-raise any exception after the some_very_long_task()
runs to completion, thus pointless as it didn't actually cancel/stop the execution of the long task.
Also a side note, future objects can't be cancelled once it already started, as documented:
cancel()
Attempt to cancel the call. If the call is currently being executed or finished running and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.
Even submitting some_very_long_task
to the executor and setting the timeout
argument to result()
wouldn't help as it would still wait for the task to finish, only that it would raise a TimeoutError
if it exceeds the timeout once its done.
Alternative Solution
Perhaps you'll find a way, but seems like concurrent.futures
isn't the tool for this job. You can consider using multiprocessing instead.
- Spawn a new process for
some_very_long_task
- Run the process in the background
- While the process is running in the background, check if it must be cancelled already.
- If the process finished, then proceed as usual.
- But if the process isn't finished yet and we already received a signal to cancel it, terminate the process.
from datetime import datetime, timezone
import multiprocessing
import time
class OperationCancelledException(Exception):
pass
def some_very_long_task(response):
print("Start long task")
time.sleep(10) # Simulate the long task with a sleep
print("Finished long task")
response['response'] = "the response!"
def get_process_status():
# Let's say the signal to cancel an ongoing process comes from a file.
with open('status.txt') as status_file:
return status_file.read()
def execute():
response = multiprocessing.Manager().dict()
proc = multiprocessing.Process(target=some_very_long_task, args=(response,), kwargs={})
proc.start()
while proc.is_alive():
status = get_process_status()
if status == "CANCELLED":
proc.terminate()
raise OperationCancelledException()
time.sleep(1)
proc.join()
return response.get('response')
try:
print(datetime.now(timezone.utc), "Script started")
result = execute()
except OperationCancelledException:
print("Operation cancelled")
else:
print(result)
finally:
print(datetime.now(timezone.utc), "Script ended")
Output if status.txt
contains "PENDING"
:
$ python3 script.py
2021-09-08 13:17:32.234437+00:00 Script started
Start long task
Finished long task
the response!
2021-09-08 13:17:42.293814+00:00 Script ended
- Task will run to completion (10 seconds sleep) if there is no signal to cancel it.
Output if status.txt
contains "PENDING"
and then changed to "CANCELLED"
while script is running:
$ python3 script.py
2021-09-08 13:19:13.370828+00:00 Script started
Start long task
Operation cancelled
2021-09-08 13:19:16.403367+00:00 Script ended
- Task was halted just after 3 seconds (the time the file was updated) if there is a signal to cancel it.
Related references: