26

I've been looking around a good implementation of a simple python thread pool pattern and really can't find anything that suits my needs. I'm using python 2.7 and all the modules I have found either don't work, or don't handle exceptions in the workers properly. I was wondering if someone knew of a library that could offer the type of functionality I'm searching for. Help greatly appreciated.

Multiprocessing

My first attempt was with the built-in multiprocessing module, but as this doesn't use threads but subprocesses instead we run into the problem that objects cannot be pickled. No go here.

from multiprocessing import Pool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = Pool(processes=8)
for s in samples: pool.apply_async(s.compute_fib, [20])
pool.join()
for s in samples: print s.fib

# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

Futures

So I see there is a back port of some of the cool concurrent features of python 3.2 here. This seems perfect and simple to use. The problem is that when you get an exception in one of the workers, you only get the type of the exception such as "ZeroDivisionError" but no traceback and thus no indication of which line caused the exception. Code becomes impossible to debug. No go.

from concurrent import futures

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = futures.ThreadPoolExecutor(max_workers=8)
threads = [pool.submit(s.compute_fib, 20) for s in samples]
futures.wait(threads, return_when=futures.FIRST_EXCEPTION)
for t in threads: t.result()
for s in samples: print s.fib


#    futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self)
#    354     def __get_result(self):
#    355         if self._exception:
#--> 356             raise self._exception
#    357         else:
#    358             return self._result
#
# ZeroDivisionError: integer division or modulo by zero

Workerpool

I found an other implementation of this pattern here. This time when an exception occurs it is printed, but then my ipython interactive interpreter is left in a hanging state and needs to be killed from an other shell. No go.

import workerpool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = workerpool.WorkerPool(size=8)
for s in samples: pool.map(s.compute_fib, [20])
pool.wait()
for s in samples: print s.fib

# ZeroDivisionError: integer division or modulo by zero
# ^C^C^C^C^C^C^C^C^D^D
# $ kill 1783

Threadpool

Yet an other implementation here. This time when an exception occurs, it is printed to the stderr but the script is not interrupted and instead continues executing, which defies the purpose of the exception and can make things unsafe. Still not usable.

import threadpool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = threadpool.ThreadPool(8)
requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples]
requests = [y for x in requests for y in x]
for r in requests: pool.putRequest(r)
pool.wait()
for s in samples: print s.fib

# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
#---> 17 for s in samples: print s.fib
#
#AttributeError: 'Sample' object has no attribute 'fib'

- Update -

It appears that concerning the futures library, the behavior of python 3 is not the same as python 2.

futures_exceptions.py:

from concurrent.futures import ThreadPoolExecutor, as_completed

def div_zero(x):
    return x / 0

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = executor.map(div_zero, range(4))
    for future in as_completed(futures): print(future)

Python 2.7.6 output:

Traceback (most recent call last):
  File "...futures_exceptions.py", line 12, in <module>
    for future in as_completed(futures):
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed
    with _AcquireFutures(fs):
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__
    self.futures = sorted(futures, key=id)
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map
    yield future.result()
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result
    return self.__get_result()
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result
    raise self._exception
ZeroDivisionError: integer division or modulo by zero

Python 3.3.2 output:

Traceback (most recent call last):
  File "...futures_exceptions.py", line 11, in <module>
    for future in as_completed(futures):
  File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed
    with _AcquireFutures(fs):
  File "...python3.3/concurrent/futures/_base.py", line 142, in __init__
    self.futures = sorted(futures, key=id)
  File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator
    yield future.result()
  File "...python3.3/concurrent/futures/_base.py", line 392, in result
    return self.__get_result()
  File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result
    raise self._exception
  File "...python3.3/concurrent/futures/thread.py", line 54, in run
    result = self.fn(*self.args, **self.kwargs)
  File "...futures_exceptions.py", line 7, in div_zero
    return x / 0
ZeroDivisionError: division by zero
kindall
  • 178,883
  • 35
  • 278
  • 309
xApple
  • 6,150
  • 9
  • 48
  • 49
  • It doesn't completely solve the problem but one trick I have often used in debugging these problems is temporarily replace the call to `pool.map` with a call to the builtin `map`. – Kamil Kisiel Jan 03 '14 at 07:03

5 Answers5

5

I personally use concurrent.futures as the interface is very simple. For the traceback issue I found a workaround to preserve it. Checkout my answer to this other question:

Getting original line number for exception in concurrent.futures

mike rodent
  • 14,126
  • 11
  • 103
  • 157
se7entyse7en
  • 4,310
  • 7
  • 33
  • 50
3

If you want to get inforamtion about unhandled exception in threads and you use ThreadPoolExecutor, you can do like this:

import time
import traceback

from concurrent.futures import ThreadPoolExecutor


def worker():
    a = 2 / 0


def worker_callbacks(f):
    e = f.exception()

    if e is None:
        return

    trace = []
    tb = e.__traceback__
    while tb is not None:
        trace.append({
            "filename": tb.tb_frame.f_code.co_filename,
            "name": tb.tb_frame.f_code.co_name,
            "lineno": tb.tb_lineno
        })
        tb = tb.tb_next
    print(str({
        'type': type(e).__name__,
        'message': str(e),
        'trace': trace
    }))


executor = ThreadPoolExecutor(max_workers=1)
executor.submit(worker).add_done_callback(worker_callbacks)
The Nomadic Coder
  • 580
  • 1
  • 4
  • 16
Alexander
  • 33
  • 2
1

Easy solution: use whatever alternative suits you best, and implement your own try-except block in your workers. Surround the root call if you must.

I wouldn't say these libraries handle exceptions "incorrectly". They have a default behavior, however primitive. You are expected to handle this yourself if defaults don't suit you.

salezica
  • 74,081
  • 25
  • 105
  • 166
  • Adding a `try-execpt` block cannot solve any of the problems. In the case of `concurrent`, I still can't get to the original traceback after catching the new exception. In the case of `workerpool`, I never get to the except block as the interpreter crashes before. In the case of `threadpool` I never get to the except block as no exceptions are raised at all. – xApple Mar 12 '13 at 11:06
  • 2
    You're thinking of a `try` block in the main thread or process. I'm saying you use a `try` block around the function worker processes run. If you expect to `raise` an exception in a worker thread/process and have it sent to your main script, you need to first catch it where it occurred. – salezica Mar 12 '13 at 11:15
  • Well I'm not going to write error handling for every one of the functions I want to run. So what you are saying is that I should write my own global error handling. Yes, I could just chose one of the libraries and start editing the source code to add functionality, but that's what I wanted to avoid : ) – xApple Mar 12 '13 at 12:00
  • 1
    It's Python: write a high-level error handler and decorate your functions with it -- or even better, subclass the `worker` class and implement your `try-except` there. This is not editing source code, much less adding functionality. It's programming. You won't get a library that does this for you, because you are expected (and should be glad to) do it yourself. No offense intended, at all. – salezica Mar 12 '13 at 23:32
  • No offense taken. I will probably write something myself. It's just that it appeared to me this something pretty basic, and am surprised it has not been done my someone else in the past. It didn't appear to me as a particular design that would require custom making. This is concurrency 101... – xApple Mar 13 '13 at 08:44
  • 1
    I agree: it is pretty basic. However, it's also very difficult to come up with a default exception handling behavior that works all-around for everyone. Most people would end up overriding it anyway, so implementations leave it for users. – salezica Mar 13 '13 at 19:48
0

For those like me who came across this question and were using threading.ThreadPool: If you want to be able to handle exceptions, apply_async and map_async both have an error_callback keyword argument to which you can pass a function for handling the exceptions that occur.

Just in case it helps anyone!

MarcTheSpark
  • 473
  • 5
  • 14
0

Inspired by this answer from se7entyse7en here:

class NonSilentThreadPoolExecutor(ThreadPoolExecutor):
    def submit(self, fn, *args, **kwargs):
        # Submits the wrapped function instead of `fn`
        return super().submit(self._function_wrapper, fn, *args, **kwargs)

    def _function_wrapper(self, fn, *args, **kwargs):
        try:
            return fn(*args, **kwargs)
        except BaseException as e:
            logger.exception(e) # or your way of dealing with any exceptions... 
            # is the following really necessary? 
            # raise sys.exc_info()[0](traceback.format_exc())
            raise e

... personally I don't quite see what the problem is with the line numbers (which se7entyse7en was attempting to address in his answer): logger.exception(e) seems to print out the precise line numbers where the exception was raised, with the rest of the stack trace seemingly OK.

In his answer se7entyse7en rightly talks about using a Future from concurrent.futures. But very often you find yourself in a situation where in fact the calling thread (e.g. Gui thread in PyQt5) can't wait for the result from the Future. So if you wanted to wait for that result you'd have to submit another task to that (or another) executor and wait for the result from the Future there ... but that defeats the object entirely as, once again, any exception raised whilst waiting for the result would also be swallowed silently.

se7entyse7en's answer in that other question obviously swallows the raised result if you can't wait for the result from the Future.

But at the very least you'd normally want to log any exceptions raised: when unhandled exceptions occur outside this concurrent context you always as standard have a stack trace printed to console. In a way, therefore, I do think the ThreadPoolExecutor class should allow you at least to specify "print stack trace on exception" as an optional parameter of submit, or better still include an optional "on_exception" callback parameter, like so:

class ExceptionAwareThreadPoolExecutor(ThreadPoolExecutor):
    def submit(self, fn, *args, on_exception='console', **kwargs):
        self.on_exception = on_exception
        return super().submit(self._function_wrapper, fn, *args, **kwargs)
        
    def _function_wrapper(self, fn, *args, **kwargs):
        try:
            return fn(*args, **kwargs)
        except BaseException as e:
            if self.on_exception == 'console':
                # print stack to console:
                logging.error(f'Exception class {e.__class__.__name__} raised', exc_info=True)
            elif self.on_exception != None:
                self.on_exception(e)
            raise e

Then call like this to achieve the same result as above:

executor.submit(task_to_be_performed, on_exception=logger.exception)

or this so that the stack trace of an exception will just be printed to console (NB also making this a drop-in for ThreadPoolExecutor with no code changes required):

executor.submit(task_to_be_performed)

or suppress stack trace output:

executor.submit(task_to_be_performed, on_exception=None)

(NB logging.error works even if you haven't set up a logger as such)

mike rodent
  • 14,126
  • 11
  • 103
  • 157