293

I'm very new to Python and multithreaded programming in general. Basically, I have a script that will copy files to another location. I would like this to be placed in another thread so I can output .... to indicate that the script is still running.

The problem that I am having is that if the files cannot be copied it will throw an exception. This is OK if running in the main thread; however, having the following code does not work:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

In the thread class itself, I tried to re-throw the exception, but it does not work. I have seen people on here ask similar questions, but they all seem to be doing something more specific than what I am trying to do (and I don't quite understand the solutions offered). I have seen people mention the usage of sys.exc_info(), however I do not know where or how to use it.

Edit: The code for the thread class is below:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder
    
    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise
martineau
  • 119,623
  • 25
  • 170
  • 301
Phanto
  • 1,147
  • 5
  • 16
  • 19
  • Can you provide more insight into what is happening inside of `TheThread`? Code sample perhaps? – jathanism May 13 '10 at 18:43
  • Sure. I'll edit my response above to include some details. – Phanto May 13 '10 at 18:44
  • 1
    Have you considered switching it round so the main Thread is the bit that does stuff and the progress indicator is in the spawned Thread? – Dan Head May 13 '10 at 19:00
  • 1
    Dan Head, are you referring to the main thread first spawning the "..." function and then running the copy function? That could work and avoid the exception issue. But, I'd still like to learn how to properly thread in python. – Phanto May 13 '10 at 19:28
  • Two key issues here. 1. Thread is async, so start() function is returning nothing. 2. Error is handled within sub-thread, not the main thread. That's why you don't catch exception in the main thread. You need to throw the exception to the main thread. Check ArtOfWarfare's solution – Diansheng May 03 '21 at 04:18

20 Answers20

147

The problem is that thread_obj.start() returns immediately. The child thread that you spawned executes in its own context, with its own stack. Any exception that occurs there is in the context of the child thread, and it is in its own stack. One way I can think of right now to communicate this information to the parent thread is by using some sort of message passing, so you might look into that.

Try this on for size:

import sys
import threading
import queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()
ErikusMaximus
  • 1,150
  • 3
  • 13
  • 28
Santa
  • 11,381
  • 8
  • 51
  • 64
  • 12
    Why not joining the thread instead of this ugly while loop? See the `multiprocessing` equivalent: https://gist.github.com/2311116 – schlamar Dec 11 '12 at 14:03
  • 3
    Why not using the EventHook pattern http://stackoverflow.com/questions/1092531/event-system-in-python/1094423#1094423 based on @Lasse answer? Rather than the loop thing? – Andre Miras Jan 22 '14 at 23:01
  • 4
    Queue is not the best vehicle to communicate an error back, unless you want to have a full queue of them. A much better construct is threading.Event() – Muposat May 19 '16 at 17:33
  • 3
    This seems unsafe to me. What happens when the thread raises an exception right after the `bucket.get()` raises `Queue.Empty`? Then the thread `join(0.1)` will complete and `isAlive() is False`, and you miss your exception. – Steve Aug 09 '16 at 18:41
  • 4
    `Queue` is unnecessary in this simple case -- you can just store the exception info as a property of the `ExcThread` as long as you make sure that `run()` completes right after the exception (which it does in this simple example). Then you simply re-raise the exception after (or during) `t.join()`. There are no synchronization problems because `join()` makes sure the thread has completed. See answer by Rok Strniša below https://stackoverflow.com/a/12223550/126362 – ejm Oct 06 '17 at 07:46
113

There are a lot of really weirdly complicated answers to this question. Am I oversimplifying this, because this seems sufficient for most things to me.

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self, timeout=None):
        super(PropagatingThread, self).join(timeout)
        if self.exc:
            raise self.exc
        return self.ret

If you're certain you'll only ever be running on one or the other version of Python, you could reduce the run() method down to just the mangled version (if you'll only be running on versions of Python before 3), or just the clean version (if you'll only be running on versions of Python starting with 3).

Example usage:

def f(*args, **kwargs):
    print(args)
    print(kwargs)
    raise Exception('I suck at this')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

And you'll see the exception raised on the other thread when you join.

If you are using six or on Python 3 only, you can improve the stack trace information you get when the exception is re-raised. Instead of only the stack at the point of the join, you can wrap the inner exception in a new outer exception, and get both stack traces with

six.raise_from(RuntimeError('Exception in thread'),self.exc)

or

raise RuntimeError('Exception in thread') from self.exc
ArtOfWarfare
  • 20,617
  • 19
  • 137
  • 193
  • 2
    I'm not certain why this answer isn't more popular either. There are others here that do simple propagation too, but require extending a class and overriding. This one just does what many would expect, and requires only changing from Thread to ProagatingThread. And 4 space tabs so my copy/paste was trivial :-) ... the only improvement I'd suggest is using six.raise_from() so that you get a nice nested set of stack traces, instead of just the stack for the site of the reraise. – aggieNick02 Oct 07 '19 at 19:35
  • Thank you very much. Very simple solution. – sonulohani Mar 17 '20 at 09:38
  • 2
    My problem is that I have multiple child threads. The joins are executed in sequence, and the exception might be raised from the later joined threads. Is there a simple solution to my problem? run the join concurrently? – chuan Apr 06 '20 at 09:01
  • Thanks, that works perfectly! Not sure why it's not handled directly by python tho… – GG. Apr 22 '20 at 18:01
  • This is definetelu the most useful answer, this sulution is much more general than others yet simple. Will use it in a project! – Konstantin Sekeresh Jun 03 '20 at 09:12
  • 4
    In order to be a drop in replacement for thread, join should include a timeout argument. `def join(self, timeout=None): super(PropagatingThread, self).join(timeout) if self.exc: raise self.exc return self.ret` – saranova Oct 10 '20 at 01:11
  • 2
    If Thread is called with daemon=True it doesn't seem to work. I need to emit a message (with flask socketio) in the thread when the code in the thread fails. Is there a way to handle that? – Arpad Horvath -- Слава Україні Jul 06 '21 at 17:19
  • 3
    I suggest this simpler implementation for the run method: `super(PropagatingThread, self).run()` – Simon Ndunda Jul 16 '21 at 06:38
  • 2
    @SimonNdunda run() doesn't return a value so that will only work on functions that don't return anything. – Gareth Davidson Nov 10 '22 at 21:42
  • According to the official documents, you should not override the `join` function. – anishtain4 May 02 '23 at 15:27
  • @anishtain4 You should be able to do something equivalent while not overriding join by doing `thread.join(); if thread.exc: raise thread.exc`. But, I don't see how this could possibly make a difference. – VRehnberg May 26 '23 at 06:38
  • @VRehnberg, exactly, we can't see how it can affect it and it can have some side effects in the future because the documentation says it should not be overwritten. – anishtain4 Jun 08 '23 at 16:29
58

The concurrent.futures module makes it simple to do work in separate threads (or processes) and handle any resulting exceptions:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

concurrent.futures is included with Python 3.2, and is available as the backported futures module for earlier versions.

Jon-Eric
  • 16,977
  • 9
  • 65
  • 97
  • 10
    While this does not do exactly what the OP asked, it is exactly the hint I needed. Thank you. – Mad Physicist Dec 04 '15 at 17:49
  • 2
    And with `concurrent.futures.as_completed`, you can get immediately notified as exceptions are raised: https://stackoverflow.com/questions/2829329/catch-a-threads-exception-in-the-caller-thread-in-python/53779560#53779560 – Ciro Santilli OurBigBook.com Dec 14 '18 at 12:14
  • 5
    This code is blocking the main thread. How do you do this asynchronously? – Nikolay Shindarov Nov 28 '19 at 17:12
  • @MadPhysicist I think this actually is what the OP asked, it's just not obvious. If you look at the answer `PropagatingThread`, it's actually doing the same stuff as future: saving the result or exception and propagating that when calling `.join()` (same as future `.result()` method). – Arthur Tacca Aug 02 '22 at 11:54
  • @NikolayShindarov Only `future.result()` blocks. You can check `future.running()` without blocking and if it's `True` continue to do something else, as shown in this answer. Much better than that, you can run other work in a different future and then call `concurrent.futures.wait()` or `as_completed()` (as the other comment says) to wait until one or all are finished. – Arthur Tacca Aug 02 '22 at 11:58
30

Although it is not possible to directly catch an exception thrown in a different thread, here's a code to quite transparently obtain something very close to this functionality. Your child thread must subclass the ExThread class instead of threading.Thread and the parent thread must call the child_thread.join_with_exception() method instead of child_thread.join() when waiting for the thread to finish its job.

Technical details of this implementation: when the child thread throws an exception, it is passed to the parent through a Queue and thrown again in the parent thread. Notice that there's no busy waiting in this approach .

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()
Mateusz Kobos
  • 621
  • 8
  • 8
  • 1
    Wouldn't you want to catch `BaseException`, not `Exception`? All you're doing is propagating the exception out of one `Thread` and into another. Right now, IE, a `KeyboardInterrupt`, would be silently ignored if it was raised in a background thread. – ArtOfWarfare Jul 24 '15 at 14:00
  • `join_with_exception` hangs indefinitely if called a second time on a dead thread. Fix: https://github.com/fraserharris/threading-extensions/blob/master/threading_extensions.py#L28 – Fraser Harris May 11 '16 at 20:05
  • I don't think `Queue` is necessary; see my comment to @Santa's answer. You can simplify it down to something like Rok Strniša's answer below https://stackoverflow.com/a/12223550/126362 – ejm Oct 06 '17 at 07:53
24

If an exception occurs in a thread, the best way is to re-raise it in the caller thread during join. You can get information about the exception currently being handled using the sys.exc_info() function. This information can simply be stored as a property of the thread object until join is called, at which point it can be re-raised.

Note that a Queue.Queue (as suggested in other answers) is not necessary in this simple case where the thread throws at most 1 exception and completes right after throwing an exception. We avoid race conditions by simply waiting for the thread to complete.

For example, extend ExcThread (below), overriding excRun (instead of run).

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

The 3 argument form for raise is gone in Python 3, so change the last line to:

raise new_exc.with_traceback(self.exc[2])
ejm
  • 782
  • 7
  • 16
Rok Strniša
  • 6,781
  • 6
  • 41
  • 53
  • 2
    Why do you use threading.Thread.join(self) instead of super(ExcThread, self).join()? – Richard Möhn Jul 28 '18 at 23:44
  • 2
    The [docs](https://docs.python.org/3/library/threading.html#thread-objects) explicitly state that only the `__init__` and `run` methods of `threading.Thread` should be overridden, although they don't explain why this is recommended. – snakecharmerb May 24 '21 at 14:54
19

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

The following solution:

  • returns to the main thread immediately when an exception is called
  • requires no extra user defined classes because it does not need:
    • an explicit Queue
    • to add an except else around your work thread

Source:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)
    
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

Possible output:

0
0
1
1
2
2
0
Exception()
1
2
None

It is unfortunately not possible to kill futures to cancel the others as one fails:

If you do something like:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

then the with catches it, and waits for the second thread to finish before continuing. The following behaves similarly:

for future in concurrent.futures.as_completed(futures):
    future.result()

since future.result() re-raises the exception if one occurred.

If you want to quit the entire Python process, you might get away with os._exit(0), but this likely means you need a refactor.

Custom class with perfect exception semantics

I ended up coding the perfect interface for myself at: The right way to limit maximum number of threads running at once? section "Queue example with error handling". That class aims to be both convenient, and give you total control over submission and result / error handling.

Tested on Python 3.6.7, Ubuntu 18.04.

Rohit Banga
  • 18,458
  • 31
  • 113
  • 191
Ciro Santilli OurBigBook.com
  • 347,512
  • 102
  • 1,199
  • 985
11

In Python 3.8, we can use threading.excepthook to hook the uncaught exceptions in all the child threads! For example,

threading.excepthook = thread_exception_handler

Referer: https://stackoverflow.com/a/60002752/5093308

Zhou Hongbo
  • 1,297
  • 13
  • 25
  • 4
    This doesn't help as the handler still runs within the child thread and not the main thread – Elad Avron Feb 14 '21 at 10:51
  • Just adding my code as an example: `def thread_exception_handler(args): print("In excepthook") @app.before_first_request def before_first_request(): threading.Thread(target=update_load).start() threading.excepthook = thread_exception_handler` – Duarte Nov 23 '21 at 17:30
  • Looked promising, but I agree with @EladAvron - the handler will run in the child process. – icemtel Nov 02 '22 at 10:24
10

What im doing is, simple overriding join and run method of the Thread:

class RaisingThread(threading.Thread):
  def run(self):
    self._exc = None
    try:
      super().run()
    except Exception as e:
      self._exc = e

  def join(self, timeout=None):
    super().join(timeout=timeout)
    if self._exc:
      raise self._exc

Used as followed:

def foo():
  time.sleep(2)
  print('hi, from foo!')
  raise Exception('exception from foo')    

t = RaisingThread(target=foo)
t.start()
try:
  t.join()
except Exception as e:
  print(e)

Result:

hi, from foo!
exception from foo!
MoRe
  • 1,478
  • 13
  • 25
cebor
  • 6,546
  • 4
  • 24
  • 31
  • 2
    Bravo! I read most of the answers here but didn't notice this one until I had come up with a solution that works, after trying various things (including threading.excepthook). And what I ended up with is almost identical to this solution :) It deserves some upvotes – works like a dream. – Kristjan Jonasson Feb 16 '22 at 13:48
7

This was a nasty little problem, and I'd like to throw my solution in. Some other solutions I found (async.io for example) looked promising but also presented a bit of a black box. The queue / event loop approach sort of ties you to a certain implementation. The concurrent futures source code, however, is around only 1000 lines, and easy to comprehend. It allowed me to easily solve my problem: create ad-hoc worker threads without much setup, and to be able to catch exceptions in the main thread.

My solution uses the concurrent futures API and threading API. It allows you to create a worker which gives you both the thread and the future. That way, you can join the thread to wait for the result:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

...or you can let the worker just send a callback when done:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

...or you can loop until the event completes:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

Here's the code:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

...and the test function:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')
Calvin Froedge
  • 16,135
  • 16
  • 55
  • 61
5

I know I'm a bit late to the party here but I was having a very similar problem but it included using tkinter as a GUI, and the mainloop made it impossible to use any of the solutions that depend on .join(). Therefore I adapted the solution given in the EDIT of the original question, but made it more general to make it easier to understand for others.

Here is the new thread class in action:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

Of course you can always have it handle the exception some other way from logging, such as printing it out, or having it output to the console.

This allows you to use the ExceptionThread class exactly like you would the Thread class, without any special modifications.

Firo
  • 1
  • 1
  • 2
4

I use this version, it's minimal and it works well.

class SafeThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        super(SafeThread, self).__init__(*args, **kwargs)
        self.exception = None

    def run(self) -> None:
        try:
            super(SafeThread, self).run()
        except Exception as ex:
            self.exception = ex
            traceback.print_exc()

    def join(self, *args, **kwargs) -> None:
        super(SafeThread, self).join(*args, **kwargs)
        if self.exception:
            raise self.exception

To use it, simply replace threading.Thread with SafeThread e.g

t = SafeThread(target = some_function, args = (some, args,))
t.start()
# do something else here if you want as the thread runs in the background
t.join()
Simon Ndunda
  • 894
  • 7
  • 9
3

Similar way like RickardSjogren's without Queue, sys etc. but also without some listeners to signals: execute directly an exception handler which corresponds to an except block.

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

Only self._callback and the except block in run() is additional to normal threading.Thread.

Chickenmarkus
  • 1,131
  • 11
  • 25
2

As a noobie to Threading, it took me a long time to understand how to implement Mateusz Kobos's code (above). Here's a clarified version to help understand how to use it.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __name__ == '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException
Marco Leogrande
  • 8,050
  • 4
  • 30
  • 48
RightmireM
  • 2,381
  • 2
  • 24
  • 42
1

One method I am fond of is based on the observer pattern. I define a signal class which my thread uses to emit exceptions to listeners. It can also be used to return values from threads. Example:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

I do not have enough experience of working with threads to claim that this is a completely safe method. But it has worked for me and I like the flexibility.

RickardSjogren
  • 4,070
  • 3
  • 17
  • 26
1

A simple way of catching thread's exception and communicating back to the caller method could be by passing dictionary or a list to worker method.

Example (passing dictionary to worker method):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])
1

Wrap Thread with exception storage.

import threading
import sys
class ExcThread(threading.Thread):

    def __init__(self, target, args = None):
        self.args = args if args else []
        self.target = target
        self.exc = None
        threading.Thread.__init__(self)

    def run(self):
        try:
            self.target(*self.args)
            raise Exception('An error occured here.')
        except Exception:
            self.exc=sys.exc_info()

def main():
    def hello(name):
        print(!"Hello, {name}!")
    thread_obj = ExcThread(target=hello, args=("Jack"))
    thread_obj.start()

    thread_obj.join()
    exc = thread_obj.exc
    if exc:
        exc_type, exc_obj, exc_trace = exc
        print(exc_type, ':',exc_obj, ":", exc_trace)

main()
ahuigo
  • 2,929
  • 2
  • 25
  • 45
1

I like this class:

https://gist.github.com/earonesty/b88d60cb256b71443e42c4f1d949163e

import threading
from typing import Any


class PropagatingThread(threading.Thread):
    """A Threading Class that raises errors it caught, and returns the return value of the target on join."""

    def __init__(self, *args, **kwargs):
        self._target = None
        self._args = ()
        self._kwargs = {}
        super().__init__(*args, **kwargs)
        self.exception = None
        self.return_value = None
        assert self._target

    def run(self):
        """Don't override this if you want the behavior of this class, use target instead."""
        try:
            if self._target:
                self.return_value = self._target(*self._args, **self._kwargs)
        except Exception as e:
            self.exception = e
        finally:
            # see super().run() for why this is necessary
            del self._target, self._args, self._kwargs

    def join(self, timeout=None) -> Any:
        super().join(timeout)
        if self.exception:
            raise self.exception
        return self.return_value
Erik Aronesty
  • 11,620
  • 5
  • 64
  • 44
0

Using naked excepts is not a good practice because you usually catch more than you bargain for.

I would suggest modifying the except to catch ONLY the exception that you would like to handle. I don't think that raising it has the desired effect, because when you go to instantiate TheThread in the outer try, if it raises an exception, the assignment is never going to happen.

Instead you might want to just alert on it and move on, such as:

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

Then when that exception is caught, you can handle it there. Then when the outer try catches an exception from TheThread, you know it won't be the one you already handled, and will help you isolate your process flow.

Eliezer Miron
  • 388
  • 5
  • 18
jathanism
  • 33,067
  • 9
  • 68
  • 86
  • 1
    Well, if there is an error in that thread at all, I want the full program to inform the user that there was an issue and gracefully end. For that reason, I want the main thread to catch and handle all the exceptions. However, the problem still exists where if TheThread throws an exception, the main thread's try/except still won't catch it. I could have the thread detect the exception and return a false indicating that the operation was unsuccessful. That would achieve the same desired result, but I would still like to know how to properly catch a sub-thread exception. – Phanto May 13 '10 at 19:35
0

I think the other solutions are somewhat complex if the only thing you want is to actually see somewhere the exception instead of being oblivious and totally blind.

The solution is to create a custom Thread that takes a logger from the main thread and logs any exceptions.

class ThreadWithLoggedException(threading.Thread):
    """
    Similar to Thread but will log exceptions to passed logger.

    Args:
        logger: Logger instance used to log any exception in child thread

    Exception is also reachable via <thread>.exception from the main thread.
    """

    def __init__(self, *args, **kwargs):
        try:
            self.logger = kwargs.pop("logger")
        except KeyError:
            raise Exception("Missing 'logger' in kwargs")
        super().__init__(*args, **kwargs)
        self.exception = None

    def run(self):
        try:
            if self._target is not None:
                self._target(*self._args, **self._kwargs)
        except Exception as exception:
            thread = threading.current_thread()
            self.exception = exception
            self.logger.exception(f"Exception in child thread {thread}: {exception}")
        finally:
            del self._target, self._args, self._kwargs

Example:

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

def serve():
    raise Exception("Earth exploded.")

th = ThreadWithLoggedException(target=serve, logger=logger)
th.start()

Output in main thread:

Exception in child thread <ThreadWithLoggedException(Thread-1, started 139922384414464)>: Earth exploded.
Traceback (most recent call last):
  File "/core/utils.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/myapp.py", line 105, in serve
    raise Exception("Earth exploded.")
Exception: Earth exploded.

Pithikos
  • 18,827
  • 15
  • 113
  • 136
-1

pygolang provides sync.WorkGroup which, in particular, propagates exception from spawned worker threads to the main thread. For example:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""

from __future__ import print_function
from golang import sync, context

def T1(ctx, *argv):
    print('T1: run ... %r' % (argv,))
    raise RuntimeError('T1: problem')

def T2(ctx):
    print('T2: ran ok')

def main():
    wg = sync.WorkGroup(context.background())
    wg.go(T1, [1,2,3])
    wg.go(T2)

    try:
        wg.wait()
    except Exception as e:
        print('Tmain: caught exception: %r\n' %e)
        # reraising to see full traceback
        raise

if __name__ == '__main__':
    main()

gives the following when run:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)

Traceback (most recent call last):
  File "./x.py", line 28, in <module>
    main()
  File "./x.py", line 21, in main
    wg.wait()
  File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
    pyerr_reraise(pyerr)
  File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
    f(pywg._pyctx, *argv, **kw)
  File "./x.py", line 10, in T1
    raise RuntimeError('T1: problem')
RuntimeError: T1: problem

The original code from the question would be just:

    wg = sync.WorkGroup(context.background())

    def _(ctx):
        shul.copytree(sourceFolder, destFolder)
    wg.go(_)

    # waits for spawned worker to complete and, on error, reraises
    # its exception on the main thread.
    wg.wait()
kirr
  • 391
  • 4
  • 8