90

It seems that when an exception is raised from a multiprocessing.Pool process, there is no stack trace or any other indication that it has failed. Example:

from multiprocessing import Pool 

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go)
p.close()
p.join()

prints 1 and stops silently. Interestingly, raising a BaseException instead works. Is there any way to make the behavior for all exceptions the same as BaseException?

Rob Lourens
  • 15,081
  • 5
  • 76
  • 91
  • 2
    I had the same problem. The cause is as follows: the worker process catches Exception and puts a failure code and the exception on the results queue. Back in the main process, the Pool's result handler thread gets the failure code and just ignores it. Some sort of monkey-patch debug mode might be possible. An alternative would be to ensure your worker function catches any exception, returns it and an error code for your handler to print. – Rupert Nash Oct 06 '11 at 09:58
  • This has been answered here: http://stackoverflow.com/a/26096355/512111 – j08lue Feb 01 '17 at 09:04

9 Answers9

65

Maybe I'm missing something, but isn't that what the get method of the Result object returns? See Process Pools.

class multiprocessing.pool.AsyncResult

The class of the result returned by Pool.apply_async() and Pool.map_async().get([timeout])
Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get().

So, slightly modifying your example, one can do

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()

Which gives as result

1
Traceback (most recent call last):
  File "rob.py", line 10, in <module>
    x.get()
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
    raise self._value
Exception: foobar

This is not completely satisfactory, since it does not print the traceback, but is better than nothing.

UPDATE: This bug has been fixed in Python 3.4, courtesy of Richard Oudkerk. See the issue get method of multiprocessing.pool.Async should return full traceback.

Faheem Mitha
  • 6,096
  • 7
  • 48
  • 83
  • Let me know if you figure out why it doesn't return the traceback. Since it is able to return the error value, it should be able to return the traceback too. I may ask on some suitable forum - perhaps some Python development list. BTW, as you may have guessed, I came across your question while trying to find out the same thing. :-) – Faheem Mitha Jan 06 '12 at 05:43
  • 4
    Note: to do this for a bunch of simultaneously running tasks, you should save all the results on a list, then iterate through each result with get(), possibly surrounded by try/catch if you don't want to crap out on the first error. – dfrankow Feb 28 '13 at 00:58
  • @dfrankow That's a great suggestion. Would you care to suggest a possible implementation in a new answer? I'm betting it would be very useful. ;) – JoErNanO Oct 29 '14 at 16:44
  • Sadly after over a year, I've completely forgotten all of this. – dfrankow Oct 29 '14 at 20:59
  • 2
    The code as it is in the answer will wait on the `x.get()`, which ruins the point of applying a task asynchronously. The comment by @dfrankow about saving the results to a list and then `get`ting them at the end is a better solution. – gozzilli Feb 22 '15 at 17:00
  • @gozzilli feel free to post a new/modified answer. It's been a few years. I've forgotten the details. – Faheem Mitha Feb 22 '15 at 17:08
32

I have a reasonable solution for the problem, at least for debugging purposes. I do not currently have a solution that will raise the exception back in the main processes. My first thought was to use a decorator, but you can only pickle functions defined at the top level of a module, so that's right out.

Instead, a simple wrapping class and a Pool subclass that uses this for apply_async (and hence apply). I'll leave map_async as an exercise for the reader.

import traceback
from multiprocessing.pool import Pool
import multiprocessing

# Shortcut to multiprocessing's logger
def error(msg, *args):
    return multiprocessing.get_logger().error(msg, *args)

class LogExceptions(object):
    def __init__(self, callable):
        self.__callable = callable

    def __call__(self, *args, **kwargs):
        try:
            result = self.__callable(*args, **kwargs)

        except Exception as e:
            # Here we add some debugging help. If multiprocessing's
            # debugging is on, it will arrange to log the traceback
            error(traceback.format_exc())
            # Re-raise the original exception so the Pool worker can
            # clean up
            raise

        # It was fine, give a normal answer
        return result

class LoggingPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)

def go():
    print(1)
    raise Exception()
    print(2)

multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)

p.apply_async(go)
p.close()
p.join()

This gives me:

1
[ERROR/PoolWorker-1] Traceback (most recent call last):
  File "mpdebug.py", line 24, in __call__
    result = self.__callable(*args, **kwargs)
  File "mpdebug.py", line 44, in go
    raise Exception()
Exception
Alex
  • 18,484
  • 8
  • 60
  • 80
Rupert Nash
  • 1,360
  • 12
  • 20
  • It's too bad there isn't a simpler solution (or a mistake on my part) but this will get the job done- thanks! – Rob Lourens Oct 07 '11 at 03:16
  • 5
    I've realised that decorators CAN be used, if you use `@functools.wraps(func)` to decorate your wrapper. This makes your decorated function look like a function defined at the top level of a module. – Rupert Nash Oct 07 '11 at 09:24
  • 1
    The solution in [this answer](http://stackoverflow.com/a/26096355/512111) is simpler **and** supports re-raising the error in the main process! – j08lue Feb 01 '17 at 09:04
  • 1
    @j08lue - that answer is nice but comes with 3 downsides: 1) extra dependency 2) have to wrap your worker function with a try/except and the logic to return a wrapper object 3) have to sniff the return type and re-raise. On the plus side, getting the actual traceback in your main thread is nicer, I agree. – Rupert Nash Feb 02 '17 at 10:11
  • 1
    @RupertNash I actually meant a usage more like in [this new answer](http://stackoverflow.com/a/42000305/512111). That resolves downside 3. – j08lue Feb 03 '17 at 09:00
  • Worked perfect for me without the `LoggingPool`. Simply wrapped my function and passed the wrapped one to `map_async`. Helped me to print from where the error came but continue normal execution of the main program. Thanks alot! – Tomerikoo Jun 20 '19 at 12:00
  • You could use a queue to serialize the errors and pass them back to the calling method – Peter Kahn Jul 19 '19 at 14:22
25

The solution with the most votes at the time of writing has a problem:

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()  ## waiting here for go() to complete...
p.close()
p.join()

As @dfrankow noted, it will wait on x.get(), which ruins the point of running a task asynchronously. So, for better efficiency (in particular if your worker function go takes a long time) I would change it to:

from multiprocessing import Pool

def go(x):
    print(1)
    # task_that_takes_a_long_time()
    raise Exception("Can't go anywhere.")
    print(2)
    return x**2

p = Pool()
results = []
for x in range(1000):
    results.append( p.apply_async(go, [x]) )

p.close()

for r in results:
     r.get()

Advantages: the worker function is run asynchronously, so if for example you are running many tasks on several cores, it will be a lot more efficient than the original solution.

Disadvantages: if there is an exception in the worker function, it will only be raised after the pool has completed all the tasks. This may or may not be the desirable behaviour. EDITED according to @colinfang's comment, which fixed this.

gozzilli
  • 8,089
  • 11
  • 56
  • 87
  • Good effort. However, since your example is predicated on the assumption that there are multiple results, maybe expand it a bit so that there are, in fact, multiple results? Also, you write: "in particular if you worker function". That should be "your". – Faheem Mitha Feb 22 '15 at 17:44
  • You are right, thanks. I've expanded the example a bit. – gozzilli Feb 23 '15 at 12:30
  • 1
    Cool. Also, you might want to try/except, depending on how you want to tolerate errors in the fetch. – dfrankow Feb 23 '15 at 18:31
  • @gozzilli can you put `for r in ... r.get()` between `p.close()` and `p.join()`, so you exit as soon as you hit an exception – colinfang Jun 11 '15 at 18:21
  • @colinfang I believe that would `return null` because the computation hasn't occurred yet--it doesn't wait on it unless you `join()`. – gozzilli Jun 11 '15 at 22:59
  • @gozzilli I think it would wait on `get`, till the result is available, then wait for the next. Doesn't `get` call `wait` implicitly? – colinfang Jun 12 '15 at 09:44
11

Since there are already decent answers for multiprocessing.Pool available, I will provide a solution using a different approach for completeness.

For python >= 3.2 the following solution seems to be the simplest:

from concurrent.futures import ProcessPoolExecutor, wait

def go():
    print(1)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor() as p:
    for i in range(10):
        futures.append(p.submit(go))

results = [f.result() for f in futures]

Advantages:

  • very little code
  • raises an exception in the main process
  • provides a stack trace
  • no external dependencies

For more info about the API please check out this

Additionally, if you are submitting a large number of tasks and you would like your main process to fail as soon as one of your tasks fail, you can use the following snippet:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time


def go():
    print(1)
    time.sleep(0.3)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor(1) as p:
    for i in range(10):
        futures.append(p.submit(go))

    for f in as_completed(futures):
        if f.exception() is not None:
            for f in futures:
                f.cancel()
            break

[f.result() for f in futures]

All of the other answers fail only once all tasks have been executed.

10 Rep
  • 2,217
  • 7
  • 19
  • 33
Vladimirs
  • 910
  • 11
  • 17
10

I've had success logging exceptions with this decorator:

import traceback, functools, multiprocessing

def trace_unhandled_exceptions(func):
    @functools.wraps(func)
    def wrapped_func(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except:
            print 'Exception in '+func.__name__
            traceback.print_exc()
    return wrapped_func

with the code in the question, it's

@trace_unhandled_exceptions
def go():
    print(1)
    raise Exception()
    print(2)

p = multiprocessing.Pool(1)

p.apply_async(go)
p.close()
p.join()

Simply decorate the function you pass to your process pool. The key to this working is @functools.wraps(func) otherwise multiprocessing throws a PicklingError.

code above gives

1
Exception in go
Traceback (most recent call last):
  File "<stdin>", line 5, in wrapped_func
  File "<stdin>", line 4, in go
Exception
Mark
  • 2,196
  • 1
  • 14
  • 8
Mark Foreman
  • 2,190
  • 18
  • 16
  • This doesn't work if the function being run in parallel -- go() in this case -- returns a value. The decorator doesn't pass the return value through. Other than that I like this solution. – MD004 Mar 22 '16 at 18:25
  • For passing return values just modify the wrapper_func like this: ` def wrapped_func(*args, **kwargs): result = None try: result = func(*args, **kwargs) except: print ('Exception in '+func.__name__) traceback.print_exc() return result ` Works like charm ;) – MoTSCHIGGE Aug 30 '16 at 16:47
5

Since you have used apply_sync, I guess the use case is want to do some synchronize tasks. Use callback for handling is another option. Please note this option is available only for python3.2 and above and not available on python2.7.

from multiprocessing import Pool

def callback(result):
    print('success', result)

def callback_error(result):
    print('error', result)

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)

# You can do another things

p.close()
p.join()
Sanju
  • 1,974
  • 1
  • 18
  • 33
Asoul
  • 996
  • 1
  • 10
  • 22
  • there is no such `error_callbak` for `apply_async` method ,refer https://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.apply_async – Sanju Aug 11 '17 at 04:00
  • 2
    for the later version: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.apply_async – Asoul Aug 11 '17 at 13:52
4
import logging
from multiprocessing import Pool

def proc_wrapper(func, *args, **kwargs):
    """Print exception because multiprocessing lib doesn't return them right."""
    try:
        return func(*args, **kwargs)
    except Exception as e:
        logging.exception(e)
        raise

def go(x):
    print x
    raise Exception("foobar")

p = Pool()
p.apply_async(proc_wrapper, (go, 5))
p.join()
p.close()
erezarnon
  • 126
  • 2
  • 8
1

I created a module RemoteException.py that shows the full traceback of a exception in a process. Python2. Download it and add this to your code:

import RemoteException

@RemoteException.showError
def go():
    raise Exception('Error!')

if __name__ == '__main__':
    import multiprocessing
    p = multiprocessing.Pool(processes = 1)
    r = p.apply(go) # full traceback is shown here
User
  • 14,131
  • 2
  • 40
  • 59
0

I'd try using pdb:

import pdb
import sys
def handler(type, value, tb):
  pdb.pm()
sys.excepthook = handler
Clarus
  • 2,259
  • 16
  • 27