84

I am currently playing around with multiprocessing and queues. I have written a piece of code to export data from mongoDB, map it into a relational (flat) structure, convert all values to string and insert them into mysql.

Each of these steps is submitted as a process and given import/export queues, safe for the mongoDB export which is handled in the parent.

As you will see below, I use queues and child processes terminate themselves when they read "None" from the queue. The problem I currently have is that, if a child process runs into an unhandled Exception, this is not recognized by the parent and the rest just Keeps running. What I want to happen is that the whole shebang quits and at best reraise the child error.

I have two questions:

  1. How do I detect the child error in the parent?
  2. How do I kill my child processes after detecting the error (best practice)? I realize that putting "None" to the queue to kill the child is pretty dirty.

I am using python 2.7.

Here are the essential parts of my code:

# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()

[...]

    # create child processes
    # all processes generated here are subclasses of "multiprocessing.Process"

    # create mapper
    mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
               for i in range(10)]

    # create datatype converter, converts everything to str
    converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
                  for i in range(10)]

    # create mysql writer
    # I create a list of writers. currently only one, 
    # but I have the option to parallellize it further
    writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
               , columns, 'w_'+mysql_table, 1000) for i in range(1)]

    # starting mapper
    for mapper in mappers:
        mapper.start()
    time.sleep(1)

    # starting converter
    for converter in converters:
        converter.start()

    # starting writer
    for writer in writers:
        writer.start()

[... initializing mongo db connection ...]

    # put each dataset read to queue for the mapper
    for row in mongo_collection.find({inc_column: {"$gte": start}}):
        mongo_input_result_q.put(row)
        count += 1
        if count % log_counter == 0:
            print 'Mongo Reader' + " " + str(count)
    print "MongoReader done"

    # Processes are terminated when they read "None" object from queue
    # now that reading is finished, put None for each mapper in the queue so they terminate themselves
    # the same for all followup processes
    for mapper in mappers:
        mongo_input_result_q.put(None)
    for mapper in mappers:
        mapper.join()
    for converter in converters:
        mapper_result_q.put(None)
    for converter in converters:
        converter.join()
    for writer in writers:
        converter_result_q.put(None)
    for writer in writers:
        writer.join()
drunken_monkey
  • 1,760
  • 1
  • 12
  • 14
  • It was suggested that I rewrite my 3 steps to be one single function and submit it to a process pool. But i want these steps split up, they should be interchangeable. In the end I will have several classes that all to one specific task and I can run them as processes with queues between them (as shown above). There could also be a file output instead of the mysql writer or an additional transformation step, where i split or merge columns. Think of them as steps in a Kettle transformation, if you know the tool. – drunken_monkey Nov 12 '13 at 12:56
  • 1
    I put an answer to your specific questions but at a higher level, are your worker processes really going to be cpu-bound? The stuff you are talking about sounds like it would be I/O bound. If so, I don't think multiprocessing is going to help you. Have you looked at [the many alternatives](http://stackoverflow.com/questions/1824418/a-clean-lightweight-alternative-to-pythons-twisted) – KobeJohn Nov 12 '13 at 13:45

4 Answers4

69

Why not to let the Process to take care of its own exceptions, like this:

from __future__ import print_function
import multiprocessing as mp
import traceback

class Process(mp.Process):
    def __init__(self, *args, **kwargs):
        mp.Process.__init__(self, *args, **kwargs)
        self._pconn, self._cconn = mp.Pipe()
        self._exception = None

    def run(self):
        try:
            mp.Process.run(self)
            self._cconn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._cconn.send((e, tb))
            # raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._pconn.poll():
            self._exception = self._pconn.recv()
        return self._exception

Now you have, both error and traceback at your hands:

def target():
    raise ValueError('Something went wrong...')

p = Process(target = target)
p.start()
p.join()

if p.exception:
    error, traceback = p.exception
    print(traceback)

Regards, Marek

mrkwjc
  • 1,080
  • 8
  • 17
  • I run into the following error on my linux server: File "/home/ec2-user/anaconda3/lib/python3.6/multiprocessing/connection.py", line 252, in recv return _ForkingPickler.loads(buf.getbuffer()) TypeError: init() takes 1 positional argument but 2 were given Does anyone know what to do? – Jens de Bruijn Nov 06 '17 at 15:25
  • This solutoion doesn't work with HTTPError for some reason. – Mykhailo Seniutovych Aug 08 '18 at 18:40
  • From python docs: "Though being an exception (a subclass of URLError), an HTTPError can also function as a non-exceptional file-like return value (the same thing that urlopen() returns). This is useful when handling exotic HTTP errors, such as requests for authentication." Maybe you face such exotic situation? – mrkwjc Aug 10 '18 at 07:31
  • 1
    AFAICT (python 3.6) this doesn't work. Exceptions thrown in `target()` are not bubbled-up to `Process.run(self)` . – Justin M. Keyes May 27 '19 at 16:22
  • 2
    Just tested and it works as expected (python 3.7). Only print statement have to be modified... – mrkwjc May 27 '19 at 16:53
  • There is one disadvantage of this solution. If we have few processes and just 1 has error, we need to wait until all processes are finished in order to check if `p.exception`. Fixed here: https://stackoverflow.com/a/58060759/4992248 – TitanFighter Sep 23 '19 at 10:50
  • Note that as @JensdeBruijn has noticed there appears to be an [open python bug](https://bugs.python.org/issue37287) which affects pickling exceptions. Curious if anybody has a work-around for this. – kontur Oct 22 '19 at 07:54
  • @JensdeBruijn A naive work-around to seems to be avoiding to send the error object over the pipe, e.g. instead of `self._child_conn.send((e, tb))` use `self._child_conn.send(tb)` so send only the traceback. – kontur Oct 22 '19 at 08:02
  • @mrkwjc - why would I want to raise the exception from `run()` (i.e. `raise e`)? – Shuzheng Oct 01 '20 at 13:58
  • @mrkwjc - and why do you send `None` over the client pipe: `self._cconn.send(None)`? – Shuzheng Oct 01 '20 at 14:05
  • The answer to the first question is: i don't know, i never used this. That's why this line is commented out. The second one: `self._cconn.send(None)` provides information that everything was OK. Isn't it, that `self._pconn.recv()` raises something when there's nothing on the other side? – mrkwjc Oct 01 '20 at 17:25
  • 2
    This code will deadlock if exception is too big (message and/or stack trace too long). The receiving end must call `Pipe.recv()` regularly otherwise `Pipe.send()` will block when the internal buffer becomes full. The `join()` will wait forever for the child to exit, while the child will wait forever for the parent to do `recv()` which only happens after `join()` finishes. – hamstergene Oct 03 '20 at 00:31
  • You're right. But the whole idea arises from the necessity of checking `p.exception`after `p.join()`. Otherwise `mp.Process` class is just enough. – mrkwjc Oct 03 '20 at 09:51
  • Ahh, sorry @hamstergene, i misunderstood your comment. You say that even single exception can fill buffer. Well, i didn't expect that exception can really be such a big thing. There's usually 64kB of buffer space... – mrkwjc Oct 06 '20 at 16:36
  • For me it happened on `subprocess.CalledProcessError` with 5000 characters long message, so on my OS the buffer must be only 4096 bytes. – hamstergene Oct 14 '20 at 22:09
  • Why does it show error `AssertionError: group argument must be None for now` if I change `mp.Process.__init__(self, *args, **kwargs)` to `super().__init__(self, *args, **kwargs)`? – Lion Lai Apr 14 '22 at 06:15
  • Super() is confused with the class name, which is the same as the original one. Use another name, e.g. `ProcessWithException`. This seems like a bug in super() since new class is defined in another namespace... – mrkwjc Apr 15 '22 at 10:37
55

I don't know standard practice but what I've found is that to have reliable multiprocessing I design the methods/class/etc. specifically to work with multiprocessing. Otherwise you never really know what's going on on the other side (unless I've missed some mechanism for this).

Specifically what I do is:

  • Subclass multiprocessing.Process or make functions that specifically support multiprocessing (wrapping functions that you don't have control over if necessary)
  • always provide a shared error multiprocessing.Queue from the main process to each worker process
  • enclose the entire run code in a try: ... except Exception as e. Then when something unexpected happens send an error package with:
    • the process id that died
    • the exception with it's original context (check here). The original context is really important if you want to log useful information in the main process.
  • of course handle expected issues as normal within the normal operation of the worker
  • (similar to what you said already) assuming a long-running process, wrap the running code (inside the try/catch-all) with a loop
    • define a stop token in the class or for functions.
    • When the main process wants the worker(s) to stop, just send the stop token. to stop everyone, send enough for all the processes.
    • the wrapping loop checks the input q for the token or whatever other input you want

The end result is worker processes that can survive for a long time and that can let you know what's happening when something goes wrong. They will die quietly since you can handle whatever you need to do after the catch-all exception and you will also know when you need to restart a worker.

Again, I've just come to this pattern through trial and error so I don't know how standard it is. Does that help with what you are asking for?

KobeJohn
  • 7,390
  • 6
  • 41
  • 62
  • 1
    yes, this does help. I have been thinking about creating an Error queu to communicate between parent and child process but I was hoping there was a better (standard) solution provided by the multiprocessing module that I have not found yet. How would I tell the other child processes to terminate? – drunken_monkey Nov 12 '13 at 13:07
  • It's as you mentioned. I send a stop token to the input Q. I updated the answer to reflect this. – KobeJohn Nov 12 '13 at 13:42
  • I used your answer as starting point for my solution, thanks! I have added my solution as separate Answer to my question. – drunken_monkey Nov 13 '13 at 11:36
  • 2
    You emphasize "sending an error package with the exception with its original context", which to me means the traceback object that is used as the 3rd argument to a raise statement. But a traceback object is not pickable, so it can't be sent through a `multiprocessing.Queue` object. How do you get the context back to the parent process? – Chris Feb 24 '16 at 00:02
  • 1
    @Chris I haven't touched MP for a long time now so I can't remember. I think I had something that worked with passing exceptions based on the linked discussion to Ned Batchelder's blog but I can't say 100% I was passing the traceback object. Maybe I was just sending a string? Sorry I don't have time to go back and reinvestigate. If you have a good pattern, this question could definitely use some best practice answers with code. – KobeJohn Feb 24 '16 at 01:39
  • @Chris Probably too late already. I send traceback as string. There is no other way. – Shiplu Mokaddim Mar 04 '18 at 22:55
  • @ShipluMokaddim There are *many* other ways. Pickling and unpickling `traceback` objects with third-party serialization libraries (e.g., dill, cloudpickle) is one obvious alternative. This is why [pathos](https://github.com/uqfoundation/pathos) is commonly recommended over the standard `multiprocessing` package. – Cecil Curry Mar 30 '18 at 05:50
13

@mrkwjc 's solution is simple, so easy to understand and implement, but there is one disadvantage of this solution. When we have few processes and we want to stop all processes if any single process has error, we need to wait until all processes are finished in order to check if p.exception. Below is the code which fixes this problem (ie when one child has error, we terminate also another child):

import multiprocessing
import traceback

from time import sleep


class Process(multiprocessing.Process):
    """
    Class which returns child Exceptions to Parent.
    https://stackoverflow.com/a/33599967/4992248
    """

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        self._parent_conn, self._child_conn = multiprocessing.Pipe()
        self._exception = None

    def run(self):
        try:
            multiprocessing.Process.run(self)
            self._child_conn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._child_conn.send((e, tb))
            # raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._parent_conn.poll():
            self._exception = self._parent_conn.recv()
        return self._exception


class Task_1:
    def do_something(self, queue):
        queue.put(dict(users=2))


class Task_2:
    def do_something(self, queue):
        queue.put(dict(users=5))


def main():
    try:
        task_1 = Task_1()
        task_2 = Task_2()

        # Example of multiprocessing which is used:
        # https://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/
        task_1_queue = multiprocessing.Queue()
        task_2_queue = multiprocessing.Queue()

        task_1_process = Process(
            target=task_1.do_something,
            kwargs=dict(queue=task_1_queue))

        task_2_process = Process(
            target=task_2.do_something,
            kwargs=dict(queue=task_2_queue))

        task_1_process.start()
        task_2_process.start()

        while task_1_process.is_alive() or task_2_process.is_alive():
            sleep(10)

            if task_1_process.exception:
                error, task_1_traceback = task_1_process.exception

                # Do not wait until task_2 is finished
                task_2_process.terminate()

                raise ChildProcessError(task_1_traceback)

            if task_2_process.exception:
                error, task_2_traceback = task_2_process.exception

                # Do not wait until task_1 is finished
                task_1_process.terminate()

                raise ChildProcessError(task_2_traceback)

        task_1_process.join()
        task_2_process.join()

        task_1_results = task_1_queue.get()
        task_2_results = task_2_queue.get()

        task_1_users = task_1_results['users']
        task_2_users = task_2_results['users']

    except Exception:
        # Here usually I send email notification with error.
        print('traceback:', traceback.format_exc())


if __name__ == "__main__":
    main()
TitanFighter
  • 4,582
  • 3
  • 45
  • 73
  • 1
    @TitanFighter - why do you send `None` over the pipe: `self._child_conn.send(None)`? – Shuzheng Oct 01 '20 at 14:00
  • @Shuzheng it is for the case in which the process has no exceptions. If process you instantiated (say process `P`) started and it exited without exceptions, you'd like `P.exception` to reflect that. Here, this is a return of `None`. To ensure that `P.exception` returns `None` upon a process run encountering no exceptions, we send `None` over the pipe when no exceptions are encountered by process. – Diana Vazquez Romo Oct 06 '22 at 15:15
8

Thanks to kobejohn i have found a solution which is nice and stable.

  1. I have created a subclass of multiprocessing.Process which implements some functions and overwrites the run() method to wrap a new saferun method into a try-catch block. This Class requires a feedback_queue to initialize which is used to report info, debug, error messages back to the parent. The log methods in the class are wrappers for the globally defined log functions of the package:

    class EtlStepProcess(multiprocessing.Process):
    
        def __init__(self, feedback_queue):
            multiprocessing.Process.__init__(self)
            self.feedback_queue = feedback_queue
    
        def log_info(self, message):
            log_info(self.feedback_queue, message, self.name)
    
        def log_debug(self, message):
            log_debug(self.feedback_queue, message, self.name)
    
        def log_error(self, err):
            log_error(self.feedback_queue, err, self.name)
    
        def saferun(self):
            """Method to be run in sub-process; can be overridden in sub-class"""
            if self._target:
                self._target(*self._args, **self._kwargs)
    
        def run(self):
            try:
                self.saferun()
            except Exception as e:
                self.log_error(e)
                raise e
            return
    
  2. I have subclassed all my other process steps from EtlStepProcess. The code to be run is implemented in the saferun() method rather than run. This ways i do not have to add a try catch block around it, since this is already done by the run() method. Example:

    class MySqlWriter(EtlStepProcess):
    
        def __init__(self, mysql_host, mysql_user, mysql_passwd, mysql_schema, mysql_table, columns, commit_count,
                     input_queue, feedback_queue):
            EtlStepProcess.__init__(self, feedback_queue)
            self.mysql_host = mysql_host
            self.mysql_user = mysql_user
            self.mysql_passwd = mysql_passwd
            self.mysql_schema = mysql_schema
            self.mysql_table = mysql_table
            self.columns = columns
            self.commit_count = commit_count
            self.input_queue = input_queue
    
        def saferun(self):
            self.log_info(self.name + " started")
            #create mysql connection
            engine = sqlalchemy.create_engine('mysql://' + self.mysql_user + ':' + self.mysql_passwd + '@' + self.mysql_host + '/' + self.mysql_schema)
            meta = sqlalchemy.MetaData()
            table = sqlalchemy.Table(self.mysql_table, meta, autoload=True, autoload_with=engine)
            connection = engine.connect()
            try:
                self.log_info("start MySQL insert")
                counter = 0
                row_list = []
                while True:
                    next_row = self.input_queue.get()
                    if isinstance(next_row, Terminator):
                        if counter % self.commit_count != 0:
                            connection.execute(table.insert(), row_list)
                        # Poison pill means we should exit
                        break
                    row_list.append(next_row)
                    counter += 1
                    if counter % self.commit_count == 0:
                        connection.execute(table.insert(), row_list)
                        del row_list[:]
                        self.log_debug(self.name + ' ' + str(counter))
    
            finally:
                connection.close()
            return
    
  3. In my main file, I submit a Process that does all the work and give it a feedback_queue. This process starts all the steps and thenreads from mongoDB and puts values to the initial queue. My main process listens to the feedback queue and prints all log messages. If it receives an error log, it print the error and terminate its child, which in return also terminates all its children before dying.

    if __name__ == '__main__':
    feedback_q = multiprocessing.Queue()
    p = multiprocessing.Process(target=mongo_python_export, args=(feedback_q,))
    p.start()
    
    while p.is_alive():
        fb = feedback_q.get()
        if fb["type"] == "error":
            p.terminate()
            print "ERROR in " + fb["process"] + "\n"
            for child in multiprocessing.active_children():
                child.terminate()
        else:
            print datetime.datetime.fromtimestamp(fb["timestamp"]).strftime('%Y-%m-%d %H:%M:%S') + " " + \
                                                  fb["process"] + ": " + fb["message"]
    
    p.join()
    

I think about making a module out of it and putting it up on github, but I have to do some cleaning up and commenting first.

Shiva
  • 2,627
  • 21
  • 33
drunken_monkey
  • 1,760
  • 1
  • 12
  • 14
  • 1
    That's great to have actual code. Here is some feedback: 1) why do the log_* methods seem to call themselves? Are those top level functions you have defined elsewhere? 2) be careful with Q.get() it will block forever. You can use get(False) to spin really quickly or get(timeout=some_very_small_time) to spin without blasting your CPU. In either case you have to wrap it with try/except Queue.Empty 3) shouldn't need to terminate the processes when an unhandled error is received. the try/except handles that and lets them close out peacefully. terminate() is generally discouraged I believe anyway. – KobeJohn Nov 13 '13 at 13:51
  • 1
    4) On the same topic, I recommend using the stop token rather than terminate(). I define the stop token within each class that subclasses Process. (or Thread... actually all of this stuff applies to threading.Thread as well.) 5) You'll really want to use the reraise technique to pass exception context back to the main process so that you retain debug information. The exception is pretty useless otherwise. – KobeJohn Nov 13 '13 at 13:52
  • 1
    Thanks for the tips! Regarding your points: 1) yes, they are toplevel functions in my module that can be used outside of the module as well. 2) I will add this with a timeout and catch the timeout exception, thanks 3) since these processes are part of a whole loading structure from mongodb to mysql, i need to make sure that the whole thing shuts down when one process has an error so I do not miss any data or insert wrong data into mysql. It's an all or nothing thing and is meant to be not fault tolerant. 4)+5) will chek it out in the documentation – drunken_monkey Nov 13 '13 at 16:04
  • 1
    Have you made a module out of this? If yes, is it on github? I would love to contribute! – Dschoni Aug 20 '15 at 14:58
  • 1
    Hi Dschoni. I have indeed made a module out of it. But it is super rough and has some stuff specific to our environment. If I find the time, I will update it so that it is more usable for the public and put it on GitHub for everyone to contribute. – drunken_monkey Aug 24 '15 at 08:47
  • Just me, asking again – Dschoni Feb 21 '17 at 14:48
  • Hi Dschoni, unfortunately i do not work at that company anymore and the code is not available to me anymore. Generally, i would recommend using Pentaho Data Integration instead if rebuilding it yourself in Python. Although it was fun, at the current state of Pentaho DI it is probably less pain to use that. It is Open Source, so you can contribute. – drunken_monkey Mar 13 '17 at 12:40