9

I am running a piece of python code in which multiple threads are run through threadpool executor. Each thread is supposed to perform a task (fetch a webpage for example). What I want to be able to do is to terminate all threads, even if one of the threads fail. For instance:

with ThreadPoolExecutor(self._num_threads) as executor:
    jobs = []
    for path in paths:
        kw = {"path": path}
        jobs.append(executor.submit(start,**kw))
    for job in futures.as_completed(jobs):
        result = job.result()
        print(result)
def start(*args,**kwargs):
    #fetch the page
    if(success):
        return True
    else:
        #Signal all threads to stop

Is it possible to do so? The results returned by threads are useless to me unless all of them are successful, so if even one of them fails, I would like to save some execution time of the rest of the threads and terminate them immediately. The actual code obviously is doing relatively lengthy tasks with a couple of failure points.

Khizar Amin
  • 198
  • 1
  • 2
  • 12
  • Does this answer your question? [asyncio: Is it possible to cancel a future been run by an Executor?](https://stackoverflow.com/questions/26413613/asyncio-is-it-possible-to-cancel-a-future-been-run-by-an-executor) – Daniel Walker Jun 19 '20 at 16:10
  • Answers my question, but doesnt solve my problem. Thanks though – Khizar Amin Jun 19 '20 at 16:49
  • Don't conflate "threads" with "tasks." Threads are agents in the operating system that do things (i.e., they run code.) Tasks are things that need to be done. A thread pool executor creates and manages its _own_ threads—you should_not_ mess with them—to perform the _tasks_ that you `submit(...)` to it. – Solomon Slow Jun 19 '20 at 17:07
  • I am not that familiar with ThreadPoolExecutor , but maybe I can give you a hint with the way I usually end threads in python 2.7. – Diego Suarez Jun 19 '20 at 18:27
  • 2
    If possible use a global variable or a class attribute that can be seen by all working threads and add a condition at the end of each working thread, if job was successful or not set the global variable. ej PASS=True/False, and add checking loop or maybe if doing work inside a loop add a condition that checks for the state of this variable, when ever any of the threads reads PASS=False, then a return statement should follow, this way a worker thread can end.. , – Diego Suarez Jun 19 '20 at 18:49
  • I was thinking of a similar solution, but what i really want to do is to kill the threads as soon as one of them fails. The problem with this solution is that the thread would start the next step of the lengthy job if no thread has failed when it checked the variable. I guess thats the best that can be achieved with threads. I might switch to processes to see if they can be terminated easily. Thanks for the suggestion though, really appreciate it. – Khizar Amin Jun 19 '20 at 20:13
  • I seem a little late to the party here, but there seems to be a method in `concurrent.futures` for this [set_running_or_notify_cancel](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.set_running_or_notify_cancel) – Richard Boyne Aug 17 '21 at 13:31

4 Answers4

5

If you are done with threads and want to look into processes, then this piece of code here looks very promising and simple, almost the same syntax as threads, but with the multiprocessing module.

When the timeout flag expires the process is terminated, very convenient.

import multiprocessing

def get_page(*args, **kwargs):
    # your web page downloading code goes here

def start_get_page(timeout, *args, **kwargs):
    p = multiprocessing.Process(target=get_page, args=args, kwargs=kwargs)
    p.start()
    p.join(timeout)
    if p.is_alive():
        # stop the downloading 'thread'
        p.terminate()
        # and then do any post-error processing here

if __name__ == "__main__":
    start_get_page(timeout, *args, **kwargs)
Josh Correia
  • 3,807
  • 3
  • 33
  • 50
Diego Suarez
  • 901
  • 13
  • 16
0

I have created an answer for a similar question I had, which I think will work for this question.

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep

NUM_REQUESTS = 100


def long_request(id):
    sleep(1)

    # Simulate bad response
    if id == 10:
        return {"data": {"valid": False}}
    else:
        return {"data": {"valid": True}}


def check_results(results):
    valid = True
    for result in results:
        valid = result["data"]["valid"]

    return valid


def main():
    futures = []
    responses = []
    num_requests = 0

    with ThreadPoolExecutor(max_workers=10) as executor:
        for request_index in range(NUM_REQUESTS):
            future = executor.submit(long_request, request_index)

            # Future list
            futures.append(future)

        for future in as_completed(futures):

            is_responses_valid = check_results(responses)

            # Cancel all future requests if one invalid
            if not is_responses_valid:
                executor.shutdown(wait=False)
            else:
                # Append valid responses
                num_requests += 1
                responses.append(future.result())

    return num_requests


if __name__ == "__main__":
    requests = main()
    print("Num Requests: ", requests)
Pierre
  • 460
  • 4
  • 11
0

In my code I used multiprocessing

import multiprocessing as mp
pool = mp.Pool()
for i in range(threadNumber):
    pool.apply_async(publishMessage, args=(map_metrics, connection_parameters...,))

pool.close()
pool.terminate()
Ali Ait-Bachir
  • 550
  • 4
  • 9
-1

You can try to use StoppableThread from func-timeout. But terminating threads is strongly discouraged. And if you need to kill a thread, you probably have a design problem. Look at alternatives: asyncio coroutines and multiprocessing with legal cancel/terminating functionality.

alex_noname
  • 26,459
  • 5
  • 69
  • 86
  • Could you explain why killing a thread is a bad design? Is it simply because thread is not designed to be killed? – Ian Lin Oct 25 '21 at 09:18
  • Not only. Explained [here](https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread) – alex_noname Oct 25 '21 at 09:20
  • I guess I have a "design problem" in my tests... Thank you – Adonis Mar 30 '22 at 14:51
  • Damned geezers, I am using threads to test my app by spawning a couple of device emulators that will run in an infite loop and they will never finish. I need to kill the thread and I don't care for all the "good and bad" reasons about them. Stop telling others what to do when you don't know the usecase. Of course in a server app that should gracefully recover from a failure this is not encouraged, who on earth would like that? – nlhnt Apr 06 '23 at 08:32
  • People could just answer that you need to make the task aware of an Event and observe it. When the Event is set gracefull kill the task, end of topic. https://docs.python.org/3/library/threading.html#event-objects – nlhnt Apr 06 '23 at 08:55