8

How can I catch exceptions from a process that was executed using multiprocessing.Process()?

Consider the following python script that executes a simple failFunction() (which immediately throws a runtime error) inside of a child process using mulitprocessing.Process()

#!/usr/bin/env python3
import multiprocessing, time

# this function will be executed in a child process asynchronously
def failFunction():
   raise RuntimeError('trust fall, catch me!')

# execute the helloWorld() function in a child process in the background
process = multiprocessing.Process(
 target = failFunction,
)
process.start()

# <this is where async stuff would happen>
time.sleep(1)

# try (and fail) to catch the exception
try:
    process.join()
except Exception as e:
    print( "This won't catch the exception" )

As you can see from the following execution, attempting to wrap the .join() does not actually catch the exception

user@host:~$ python3 example.py 
Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "example4.py", line 6, in failFunction
    raise RuntimeError('trust fall, catch me!')
RuntimeError: trust fall, catch me!
user@host:~$ 

How can I update the above script to actually catch the exception from the function that was executed inside of a child process using multiprocessing.Process()?

Michael Altfield
  • 2,083
  • 23
  • 39
  • 1
    I believe you'd have to communicate the processes yourself in such case, as once you do the `fork`, child process lives on its own - I don't believe that such "exception propagation" would be possible on OS level. Here's a very reasonable approach using Queue: https://stackoverflow.com/a/19929767/5430833 – Marek Piotrowski Sep 05 '20 at 21:49

2 Answers2

5

This can be achieved by overloading the run() method in the multiprocessing.Proccess() class with a try..except statement and setting up a Pipe() to get and store any raised exceptions from the child process into an instance field for named exception:

#!/usr/bin/env python3
import multiprocessing, traceback, time

class Process(multiprocessing.Process):

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        self._pconn, self._cconn = multiprocessing.Pipe()
        self._exception = None

    def run(self):
        try:
            multiprocessing.Process.run(self)
            self._cconn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._cconn.send((e, tb))
            #raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._pconn.poll():
            self._exception = self._pconn.recv()
        return self._exception


# this function will be executed in a child process asynchronously
def failFunction():
   raise RuntimeError('trust fall, catch me!')

# execute the helloWorld() function in a child process in the background
process = Process(
 target = failFunction,
)
process.start()

# <this is where async stuff would happen>
time.sleep(1)

# catch the child process' exception
try:
    process.join()
    if process.exception:
        raise process.exception
except Exception as e:
    print( "Exception caught!" )

Example execution:

user@host:~$ python3 example.py 
Exception caught!
user@host:~$ 

Solution taken from this answer:

Michael Altfield
  • 2,083
  • 23
  • 39
0

This solution does not require the target function having to catch its own exceptions.

It may seem like overkill, but you can use class ProcessPoolExecutor in module concurrent.futures to create a process pool of size 1, which is all you that is required for your needs. When you submit a "job" to the executor a Future instance is created representing the state of execution of the process. When you call result() on the Future instance, you block until the process terminates and returns a results (the target function returns). If the target function throws an exception, you can catch it when you call result():

import concurrent.futures

def failFunction():
   raise RuntimeError('trust fall, catch me!')

def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
        future = executor.submit(failFunction)
        try:
            result = future.result()
        except Exception as e:
            print('exception = ', e)
        else:
            print('result = ', result)

if __name__ == '__main__':
    main()

Prints:

exception =  trust fall, catch me!

The bonus of using a process pool is you have a ready-made process already created if you have additional functions you need to invoke in a sub-process.

Booboo
  • 38,656
  • 3
  • 37
  • 60