0

I run several threads concurrently using concurrent.futures. All of them are necessary to run successfully for the next steps in the code to succeed.

While at the end of all processes I can raise any exceptions by running .result(), ideally any exception raised in a single thread would immediately stop all threads. This would be helpful to identify bugs in any task sooner, rather than waiting until all long-running processes complete.

Is this possible?

canary_in_the_data_mine
  • 2,193
  • 2
  • 24
  • 28
  • Can you not wrap your task function (or functions) with a `try:..except:..` that signals the other tasks to abort? – Solomon Slow Feb 11 '22 at 01:31
  • I'm not sure I understand @SolomonSlow -- how would you signal all the other tasks to abort? – canary_in_the_data_mine Mar 18 '22 at 12:39
  • I probably would have each of the tasks periodically test a global boolean variable or a field in a mutable, shared object with a name like `abort`, and I would have them stop whatever they were doing and exit if `abort` became `True`. – Solomon Slow Mar 18 '22 at 12:49

2 Answers2

2

It's possible to exit after the first exception and not submit any new jobs to the Executor. However, once a job has been submitted, it can't be cancelled, you have to wait for all submitted jobs to finish (or timeout). See this question for details. Here's a short example that cancels any unsubmitted jobs once the first exception occurs. However, it still waits for the already submitted jobs to finish. This uses the "FIRST EXCEPTION" keyword listed in the concurrent.futures docs.

import time
import concurrent.futures

def example(i):
    print(i)
    assert i != 1
    time.sleep(i)
    return i

if __name__ == "__main__":
    futures = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for number in range(5):
            futures.append(executor.submit(example, number))

        exception = False
        for completed, running_or_error in concurrent.futures.wait(futures, return_when="FIRST_EXCEPTION"):
            try:
                 running_or_error.result()
            except Exception as e:
                for future in futures:
                    print(future.cancel()) # cancel all unstarted futures
                raise e
SNygard
  • 916
  • 1
  • 9
  • 21
0

I saw the answer by SNygard. The future with the exception seems to be in the completed task and not the still running tasks. Take

import concurrent.futures
import time


def job(i):
    if i == 1:
        raise ValueError(i)

    time.sleep(2)


if __name__ == "__main__":
    with concurrent.futures.ThreadPoolExecutor(2) as pool:
        tasks = [pool.submit(job, i) for i in range(6)]
        done, not_done = concurrent.futures.wait(tasks, return_when=concurrent.futures.FIRST_EXCEPTION)
        for task in done:
            err = task.exception()
            if err is not None:
                print("exception in done")
                raise RuntimeError(err)

Running this gives

exception in done
Traceback (most recent call last):
  File "/some/path/tmp.py", line 20, in <module>
    raise RuntimeError(str(err))
RuntimeError: 1