1

I'm using some threads to compute a task in a faster way. I've seen that if one of the threads I launch raises an exception, all the other threads continue to work and the code doesn't raise that exception.

I'd like that as soon as one thread fails, all the other threads are killed and the main file raises the same exception of the thread.

My thread file is this:

from threading import Thread

class myThread(Thread):
    def __init__(self, ...):
        Thread.__init__(self)
        self.my_variables = ...

    def run(self):
        # some code that can raise Exception

My main is

import MyThread

threads = []
my_list = ["a_string", "another_string", "..."]

for idx in range(len(my_list)):
    threads.append(MyThread(idx = idx, ... )

for t in threads:
    t.start()
for t in threads:
    t.join()

I know that there are some methods to propagate the exception between the parent and the child thread as here: https://stackoverflow.com/a/2830127/12569908. But in this discussion, there is only 1 thread while I've many. In addition, I don't want to wait for all of them to end if one of them fails at the beginning. I tried to adapt that code to my case, but I still have problems.

How can I do?

Paolo Magnani
  • 549
  • 4
  • 14
  • Re, "I'd like that...all the other threads are killed." There is almost never a good idea for a program to kill its threads. Threads _co-operate_ with each other by mutating shared state (i.e., shared objects and/or global variables.) Unless your program is totally [_lock free_](https://en.wikipedia.org/wiki/Non-blocking_algorithm), then its threads will sometimes have to make that shared state temporarily invalid in order to make progress. If you kill a thread that has invalidated the state before it has a chance to fix things up again, it's usually "game over" for the program. – Solomon Slow Nov 18 '21 at 17:21
  • 1
    Also, as you have noticed, an exception is raised and caught entirely within a single thread. If you want some action to be performed when any thread raises an exception, then you will have to provide a handler (a `try...except...` block) in _every_ thread, and have the handler that catches the exception perform the action before the thread terminates. In your case, the "action" would be to _ask_ the other threads to _cleanly_ shut down. That means, you'll also need to provide some way for all of the threads to periodically check to see if they have been asked to shut down. – Solomon Slow Nov 18 '21 at 17:27
  • 1
    Thank @SolomonSlow. Indeed I'm doing exactly what you said. I put a `try...except` in all the threads and I pass them a shared bucket. When a thread fails, it adds an exception in the bucket. Doing that, in the main I see an exception in the bucket and I re-raise the exception and simultaneously I set a variable `stopped = True` in the threads. Inside them, I periodically check this variable to exit safely with a join. The behavior is good, but not perfect (the threads don't end immediately, but only when the check of `stopped` return True – Paolo Magnani Nov 22 '21 at 17:01

1 Answers1

0

You can use PyThreadState_SetAsyncExc function from here. Also look at this link. We can raise an exception in target thread with ctypes.pythonapi.PyThreadState_SetAsyncExc() function, target thread can catch this exception and do some work.

When you look at the below code, you will see that f and g function work in seperate threads. We raise ThreadKill exception in f when ZeroDivisionError occured, and then we catch this exception in myThread class then we killing other thread/threads using PyThreadState_SetAsyncExc function.

Note: If target thread has no controling over the interpreter(like syscall, time.sleep(), I/O blocking operation) then target thread will not get killed until it has controling over the interpreter

I modified your code a little.

import threading
import time,ctypes

class ThreadKill(Exception): # this is our special exception class like ZeroDivisionError
    pass

def f():
    try:
        for i in range(20):
            print("hello")
            time.sleep(1)
            if i==2:
                4/0
    except ZeroDivisionError:
        # your cleanup, close the file, flush the buffer etc.
        raise ThreadKill # We do that because we will catch this in myThread class

def g():
    try:
        for i in range(20):
            print("world")
            time.sleep(1)
    except ThreadKill:
        # your cleanup, close the file, flush the buffer
        print("i am killing")


class myThread(threading.Thread):
    def __init__(self,func):
        threading.Thread.__init__(self)
        self.func=func

    def run(self):
        try:
            self.func()

        except Exception as e: # catch "raise ThreadKill" exception
            for thread in threads:
                my_ident=threading.get_ident()
                if thread.ident!=my_ident: # don't kill yourself without all other threads signaled
                    ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread.ident),
                                                               ctypes.py_object(ThreadKill))

            ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(threading._main_thread.ident),
                                                       ctypes.py_object(ThreadKill))

threads=[]
threads.append(myThread(func=f))
threads.append(myThread(func=g))

try:
    for t in threads:
        t.start()

    for t in threads:
        t.join()
except ThreadKill:
    print("ThreadKill Exception")
Veysel Olgun
  • 552
  • 1
  • 3
  • 15