4

When using multiprocessing.Pool's apply_async(), what happens to breaks in code? This includes, I think, just exceptions, but there may be other things that make the worker functions fail.

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
for f in files:
    pool.apply_async(workerfunct, args=(*args), callback=callbackfunct) 

As I understand it right now, the process/worker fails (all other processes continue) and anything past a thrown error is not executed, EVEN if I catch the error with try/except.

As an example, usually I'd except Errors and put in a default value and/or print out an error message, and the code continues. If my callback function involves writing to file, that's done with default values.

This answerer wrote a little about it:

I suspect the reason you're not seeing anything happen with your example code is because all of your worker function calls are failing. If a worker function fails, callback will never be executed. The failure won't be reported at all unless you try to fetch the result from the AsyncResult object returned by the call to apply_async. However, since you're not saving any of those objects, you'll never know the failures occurred. If I were you, I'd try using pool.apply while you're testing so that you see errors as soon as they occur.

Community
  • 1
  • 1
ehacinom
  • 8,070
  • 7
  • 43
  • 65

1 Answers1

7

If you're using Python 3.2+, you can use the error_callback keyword argument to to handle exceptions raised in workers.

pool.apply_async(workerfunct, args=(*args), callback=callbackfunct, error_callback=handle_error) 

handle_error will be called with the exception object as an argument.

If you're not, you have to wrap all your worker functions in a try/except to ensure your callback is executed. (I think you got the impression that this wouldn't work from my answer in that other question, but that's not the case. Sorry!):

def workerfunct(*args):
    try:
        # Stuff
    except Exception as e:
        # Do something here, maybe return e?

pool.apply_async(workerfunct, args=(*args), callback=callbackfunct) 

You could also use a wrapper function if you can't/don't want to change the function you actually want to call:

def wrapper(func, *args):
    try:
        return func(*args)
    except Exception as e:
        return e

pool.apply_async(wrapper, args=(workerfunct, *args), callback=callbackfunct) 
ehacinom
  • 8,070
  • 7
  • 43
  • 65
dano
  • 91,354
  • 19
  • 222
  • 219
  • do you have to wrap the entire worker function in a `try`/`except`, or can you do it internally in the worker? For instance, I have try/excepts inside my worker that 1) print error message (this works) and 2) sets default values to things that didn't work, and then the code should continue. Unfortunately, not everything is showing up in my end file; it's entirely probable that this try/except works, but I'm throwing errors elsewhere and need to write more `try`s! – ehacinom Aug 05 '14 at 21:28
  • 1
    @rebecca You can use `try`/`except` anywhere you want within the workers to handle errors you're *expecting*. Using the "global" `try`/`except` is useful for handling unexpected errors, if only to ensure that you know they're happening for debugging purposes. You could remove them in favor of more localized `try`/`except` blocks once you've got the bugs worked out if you want, though you're risking your callback mysteriously not being called down the line at some point. – dano Aug 05 '14 at 21:31
  • @dano what should be done of I want to get a full traceback in callbackfunct – Anurag Sharma Aug 18 '17 at 10:03