2

I have execute function that calls some very long tasks. I would like to have a possibility of cancelling such tasks. I thought about throwing an exception and catching it outside. The problem is, my exceptions are not visible outside the future. How can I fix this code to cancel correctly?

def cancel_task(self):
   while True:
      process = get_process()
      if process.status == "CANCELLED":
         raise OperationCancelledException()
      time.sleep(1)

def execute(self):
   with ThreadPoolExecutor() as executor:
      future = executor.submit(self.cancel_task)
      result = some_very_long_task()
      future.cancel()

   return result
# somewhere in the code where execute() is called
try:
   execute()
except OperationCancelledException:
   pass
except Exception:
   raise
Nickon
  • 9,652
  • 12
  • 64
  • 119

1 Answers1

2

A future object has been initialized here:

future = executor.submit(self.cancel_task)

Then, we run the long task:

result = some_very_long_task()

This call to some_very_long_task() will run to completion first before moving on to the next lines. Whether the future object raised an exception or not at this point, it wouldn't affect the current execution. Actually as documented, you have to explicitly call Future.result() to re-raise any exception that happened in the submitted process (here is self.cancel_task).

class concurrent.futures.Future

result(timeout=None)

If the call raised an exception, this method will raise the same exception.

So even if you call it:

future = executor.submit(self.cancel_task)
result = some_very_long_task()
future.result()

It will only run and re-raise any exception after the some_very_long_task() runs to completion, thus pointless as it didn't actually cancel/stop the execution of the long task.

Also a side note, future objects can't be cancelled once it already started, as documented:

cancel()

Attempt to cancel the call. If the call is currently being executed or finished running and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

Even submitting some_very_long_task to the executor and setting the timeout argument to result() wouldn't help as it would still wait for the task to finish, only that it would raise a TimeoutError if it exceeds the timeout once its done.

Alternative Solution

Perhaps you'll find a way, but seems like concurrent.futures isn't the tool for this job. You can consider using multiprocessing instead.

  1. Spawn a new process for some_very_long_task
  2. Run the process in the background
  3. While the process is running in the background, check if it must be cancelled already.
  4. If the process finished, then proceed as usual.
  5. But if the process isn't finished yet and we already received a signal to cancel it, terminate the process.
from datetime import datetime, timezone
import multiprocessing
import time

class OperationCancelledException(Exception):
    pass

def some_very_long_task(response):
    print("Start long task")
    time.sleep(10)  # Simulate the long task with a sleep
    print("Finished long task")
    response['response'] = "the response!"

def get_process_status():
    # Let's say the signal to cancel an ongoing process comes from a file.
    with open('status.txt') as status_file:
        return status_file.read()

def execute():
    response = multiprocessing.Manager().dict()

    proc = multiprocessing.Process(target=some_very_long_task, args=(response,), kwargs={})
    proc.start()

    while proc.is_alive():
        status = get_process_status()
        if status == "CANCELLED":
            proc.terminate()
            raise OperationCancelledException()
        time.sleep(1)

    proc.join()
    return response.get('response')

try:
    print(datetime.now(timezone.utc), "Script started")
    result = execute()
except OperationCancelledException:
   print("Operation cancelled")
else:
    print(result)
finally:
    print(datetime.now(timezone.utc), "Script ended")

Output if status.txt contains "PENDING":

$ python3 script.py 
2021-09-08 13:17:32.234437+00:00 Script started
Start long task
Finished long task
the response!
2021-09-08 13:17:42.293814+00:00 Script ended
  • Task will run to completion (10 seconds sleep) if there is no signal to cancel it.

Output if status.txt contains "PENDING" and then changed to "CANCELLED" while script is running:

$ python3 script.py 
2021-09-08 13:19:13.370828+00:00 Script started
Start long task
Operation cancelled
2021-09-08 13:19:16.403367+00:00 Script ended
  • Task was halted just after 3 seconds (the time the file was updated) if there is a signal to cancel it.

Related references: