7

Consider the following Python script, which uses SQLAlchemy and the Python multiprocessing module. This is with Python 2.6.6-8+b1(default) and SQLAlchemy 0.6.3-3 (default) on Debian squeeze. This is a simplified version of some actual code.

import multiprocessing
from sqlalchemy import *
from sqlalchemy.orm import *
dbuser = ...
password = ...
dbname = ...
dbstring = "postgresql://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring)
m = MetaData(db)

def make_foo(i):
    t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))

conn = db.connect()
for i in range(10):
    conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()
db.dispose()

for i in range(10):
    make_foo(i)

m.create_all()

def do(kwargs):
    i, dbstring = kwargs['i'], kwargs['dbstring']

    db = create_engine(dbstring)
    Session = scoped_session(sessionmaker())
    Session.configure(bind=db)
    Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
    Session.commit()
    db.dispose()

pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i, 'dbstring':dbstring})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
r.get()
r.wait()
pool.close()
pool.join()

This script hangs with the following error message.

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 259, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 4 arguments (2 given)', <class 'sqlalchemy.exc.ProgrammingError'>, ('(ProgrammingError) syntax error at or near "%"\nLINE 1: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;\n        ^\n',))

Of course, the syntax error here is TRUNCATE foo%s;. My question is, why is the process hanging, and can I persuade it to exit with an error instead, without doing major surgery to my code? This behavior is very similar to that of my actual code.

Note that the hang does not occur if the statement is replaced by something like print foobarbaz. Also, the hang still happens if we replace

Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
Session.commit()
db.dispose()

by just Session.execute("TRUNCATE foo%s;")

I'm using the former version because it is closer to what my actual code is doing.

Also, removing multiprocessing from the picture and looping over the tables serially makes the hang go away, and it just exits with an error.

I'm also kind of puzzled by the form of the error, particularly the TypeError: ('__init__() takes at least 4 arguments (2 given)' bit. Where is this error coming from? It seems likely it is from somewhere in the multiprocessing code.

The PostgreSQL logs aren't helpful. I see lots of lines like

2012-01-09 14:16:34.174 IST [7810] 4f0aa96a.1e82/1 12/583 0 ERROR:  syntax error at or near "%" at character 28
2012-01-09 14:16:34.175 IST [7810] 4f0aa96a.1e82/2 12/583 0 STATEMENT:  COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;

but nothing else that seems relevant.

UPDATE 1: Thanks to lbolla and his insightful analysis, I was able to file a Python bug report about this. See sbt's analysis in that report, and also here. See also the Python bug report Fix exception pickling. So, following sbt's explanation, we can reproduce the original error with

import sqlalchemy.exc
e = sqlalchemy.exc.ProgrammingError("", {}, None)
type(e)(*e.args)

which gives

Traceback (most recent call last):
  File "<stdin>", line 9, in <module>
TypeError: __init__() takes at least 4 arguments (2 given)

UPDATE 2: This has been fixed, at least for SQLAlchemy, by Mike Bayer, see the bug report StatementError Exceptions un-pickable.. Per Mike's suggestion, I also reported a similar bug to psycopg2, though I didn't (and don't) have an actual example of breakage. Regardless, they have apparently fixed it, though they gave no details of the fix. See psycopg exceptions cannot be pickled. For good measure, I also reported a Python bug ConfigParser exceptions are not pickleable corresponding to the SO question lbolla mentioned. It seems they want a test for this.

Anyway, this looks like it will continue to be a problem in the foreseeable future, since, by and large, Python developers don't seem to be aware of this issue and so don't guard against it. Surprisingly, it seems that there are not enough people using multiprocessing for this to be a well known issue, or maybe they just put up with it. I hope the Python developers get around to fixing it at least for Python 3, because it is annoying.

I accepted lbolla's answer, as without his explanation of how the problem was related to exception handling, I would likely have gone nowhere in understanding this. I also want to thank sbt, who explained that Python not being able to pickle exceptions was the problem. I'm very grateful to both of them, and please vote their answers up. Thanks.

UPDATE 3: I posted a followup question: Catching unpickleable exceptions and re-raising.

Community
  • 1
  • 1
Faheem Mitha
  • 6,096
  • 7
  • 48
  • 83

4 Answers4

11

I believe the TypeError comes from multiprocessing's get.

I've stripped out all the DB code from your script. Take a look at this:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Using r.wait returns the result expected, but using r.get raises TypeError. As describe in python's docs, use r.wait after a map_async.

Edit: I have to amend my previous answer. I now believe the TypeError comes from SQLAlchemy. I've amended my script to reproduce the error.

Edit 2: It looks like the problem is that multiprocessing.pool does not play well if any worker raises an Exception whose constructor requires a parameter (see also here).

I've amended my script to highlight this.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

In your case, given that your code raises an SQLAlchemy exception, the only solution I can think of is to catch all the exceptions in the do function and re-raise a normal Exception instead. Something like this:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Edit 3: so, it seems to be a bug with Python, but proper exceptions in SQLAlchemy would workaround it: hence, I've raised the issue with SQLAlchemy, too.

As a workaround the problem, I think the solution at the end of Edit 2 would do (wrapping callbacks in try-except and re-raise).

Community
  • 1
  • 1
lbolla
  • 5,387
  • 1
  • 22
  • 35
  • Hi Lorenzo, I don't get the same thing. First, note that the reason I'm using get is that without it, the exception is not propagated. The example you have given behaves as expected. Without `get` and with `wait`, the exception is not shown, but is clearly raised, because the second `print i` statement doesn't happen. With just `get` or with both `get` and `wait`, the exception is shown. In neither case do I see a type error. What Python version and platform are you using? I'm using 2.6.6 (the default) on Debian squeeze. Thanks. – Faheem Mitha Jan 09 '12 at 14:55
  • The traceback with just `get` is `File "", line 17, in File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get raise self._value Exception: foo 1 7 8 9 0 2 3 4 5 6` – Faheem Mitha Jan 09 '12 at 15:00
  • Can you post the full stack trace you get? I've amended my script raising an SQLAlchemy error and now I get the same TypeError you got (and the scripts hangs, too). – lbolla Jan 09 '12 at 15:21
  • Hi Lorenzo, what I posted above *is* the full stack trace, for the example you posted. I just changed the `Exception` to `Exception("foo")`.The `get` returns the exception value, but does not return the stack trace from the site of the error. I don't know why. – Faheem Mitha Jan 09 '12 at 15:23
  • See also this [related question](http://stackoverflow.com/questions/6728236/exception-thrown-in-multiprocessing-pool-not-detected). – Faheem Mitha Jan 09 '12 at 15:25
  • I meant, the full stack trace of your original script: it's obviously incomplete. – lbolla Jan 09 '12 at 15:27
  • No, it is the complete stack trace. Why do you think it is incomplete? – Faheem Mitha Jan 09 '12 at 15:30
  • With your revised script, I do get a very similar error, namely `0 1 2 3 4 5 6 7 8 9 Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/usr/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.6/multiprocessing/pool.py", line 259, in _handle_results task = get() TypeError: ('__init__() takes at least 4 arguments (2 given)', , ('(NoneType) None',))`. – Faheem Mitha Jan 09 '12 at 15:31
  • So, I agree that it looks like the error is coming from SQLAlchemy. – Faheem Mitha Jan 09 '12 at 15:32
  • I think it's incomplete because it's like: `TypeError: ('__init__() takes at least 4 arguments (2 given)', , ('(ProgrammingError) syntax error at or near "%"\nLINE 1: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;\n` -- there are missing closing parenthesis. – lbolla Jan 09 '12 at 15:36
  • Ah, right. There were a few missing characters at the end. I think I forgot to paste them, or deleted them or something. Added now. Sorry about that. – Faheem Mitha Jan 09 '12 at 15:41
  • Hi Lorenzo. Thanks very much for the update. I think you've put your finger on the problem. I'm still puzzled on what problem `multiprocessing` has with Exceptions which require a parameter. In any case, do you think this is a bug? If so, I'll report it. I get the same behavior with python 2.7. I'm currently trying to reproduce with 3.1. – Faheem Mitha Jan 09 '12 at 18:18
  • Same problem with 3.1, though a slightly different traceback. With your GoodExc/BadExc I get – Faheem Mitha Jan 09 '12 at 19:28
  • `0 1 2 3 5 4 6 7 8 9 Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/python3.1/threading.py", line 516, in _bootstrap_inner self.run() File "/usr/lib/python3.1/threading.py", line 469, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.1/multiprocessing/pool.py", line 265, in _handle_results task = get() File "/usr/lib/python3.1/pickle.py", line 1363, in loads encoding=encoding, errors=errors).load() TypeError: __init__() takes exactly 2 positional arguments (1 given)` – Faheem Mitha Jan 09 '12 at 19:28
  • I've edited my answer to reflect that and also raised the issue with SQLAlchemy. If you are satisfied by the answer, please approve it so we can close this thread. – lbolla Jan 10 '12 at 09:35
  • The SQLA bug has been fixed, see my update. Thanks again for your help with this. – Faheem Mitha Jan 18 '12 at 06:55
2

The TypeError: ('__init__() takes at least 4 arguments (2 given) error isn't related to the sql you're trying to execute, it has to do with how you're using SqlAlchemy's API.

The trouble is that you're trying to call execute on the session class rather than an instance of that session.

Try this:

session = Session()
session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
session.commit()

From the docs:

It is intended that the sessionmaker() function be called within the global scope of an application, and the returned class be made available to the rest of the application as the single class used to instantiate sessions.

So Session = sessionmaker() returns a new session class and session = Session() returns an instance of that class which you can then call execute on.

Stephen Emslie
  • 10,539
  • 9
  • 32
  • 28
  • Hi Stephen, The idiom I'm using is from e.g. [Creating a Thread-local Context](http://www.sqlalchemy.org/docs/06/orm/session.html#unitofwork-contextual) (0.6 version). This is used (I believe) when you are working with multiple threads. Since I'm using multiple processes, I guess I don't need to use it - it is left over from a previous version which was using threads, but I had some problems with keeping the threads separate. In any case, I'm confident this has nothing to do with the problem I'm seeing. – Faheem Mitha Jan 09 '12 at 16:39
  • Just to make sure, I replaced the `scoped_session` syntax with the conventional one you mention, and it didn't make any difference. – Faheem Mitha Jan 09 '12 at 16:39
1

I don't know about the cause of the original exception. However, multiprocessing's problems with "bad" exceptions is really down to how pickling works. I think the sqlachemy exception class is broken.

If an exception class has an __init__() method which does not call BaseException.__init__() (directly or indirectly) then self.args probably will not be set properly. BaseException.__reduce__() (which is used by the pickle protocol) assumes that a copy of an exception e can be recreated by just doing

type(e)(*e.args)

For example

>>> e = ValueError("bad value")
>>> e
ValueError('bad value',)
>>> type(e)(*e.args)
ValueError('bad value',)

If this invariant does not hold then pickling/unpickling will fail. So instances of

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

can be pickled, but the result cannot be unpickled:

>>> from cPickle import loads, dumps
>>> class BadExc(Exception):
...     def __init__(self, a):
...         '''Non-optional param in the constructor.'''
...         self.a = a
...
>>> loads(dumps(BadExc(1)))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: ('__init__() takes exactly 2 arguments (1 given)', <class '__main__.BadExc'>, ())

But instances of

class GoodExc1(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        Exception.__init__(self, a)
        self.a = a

or

class GoodExc2(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.args = (a,)
        self.a = a

can be successfully pickled/unpickled.

So you should ask the developers of sqlalchemy to fix their exception classes. In the mean time you can probably use copy_reg.pickle() to override BaseException.__reduce__() for the troublesome classes.

sbt
  • 11
  • 2
  • Thanks very much for your explanation. Can you illustrate how to `use copy_reg.pickle() to override BaseException.__reduce__()` in this case? Thanks. – Faheem Mitha Jan 10 '12 at 05:46
  • [Here](http://hg.sqlalchemy.org/sqlalchemy/file/b7f6824e5a33/lib/sqlalchemy/exc.py) is SQLAlchemy's exception class. Is there some way to override all the classes at once? – Faheem Mitha Jan 10 '12 at 06:18
1

(This is in answer to Faheem Mitha's question in a comment about how to use copy_reg to work around the broken exception classes.)

The __init__() methods of SQLAlchemy's exception classes seem to call their base class's __init__() methods, but with different arguments. This mucks up pickling.

To customise the pickling of sqlalchemy's exception classes you can use copy_reg to register your own reduce functions for those classes.

A reduce function takes an argument obj and returns a pair (callable_obj, args) such that a copy of obj can be created by doing callable_obj(*args). For example

class StatementError(SQLAlchemyError):
    def __init__(self, message, statement, params, orig):
        SQLAlchemyError.__init__(self, message)
        self.statement = statement
        self.params = params
        self.orig = orig
    ...

can be "fixed" by doing

import copy_reg, sqlalchemy.exc

def reduce_StatementError(e):
    message = e.args[0]
    args = (message, e.statement, e.params, e.orig)
    return (type(e), args)

copy_reg.pickle(sqlalchemy.exc.StatementError, reduce_StatementError)

There are several other classes in sqlalchemy.exc which need to be fixed similarly. But hopefully you get the idea.


On second thoughts, rather than fixing each class individually, you can probably just monkey patch the __reduce__() method of the base exception class:

import sqlalchemy.exc

def rebuild_exc(cls, args, dic):
    e = Exception.__new__(cls)
    e.args = args
    e.__dict__.update(dic)
    return e

def __reduce__(e):
    return (rebuild_exc, (type(e), e.args, e.__dict__))

sqlalchemy.exc.SQLAlchemyError.__reduce__ = __reduce__
sbt
  • 11
  • 2
  • Hi sbt, thanks very much for your answer, it is very helpful. As I mentioned in the update, Mike Bayer has now fixed the problem with SQLAlchemy, though it will continue to be a problem with other libraries, probably. I wonder though, why, with 2 upvotes, you only have 11 reputation. It should be 21, surely? As regards handling the error, I'm inclined to try catching and reraising an exception that is thrown from inside the multiprocessing job, as lbolla suggests, since even if I fix the SQLA exceptions, I can't be sure an exception from outside SQLA won't get thrown. – Faheem Mitha Jan 18 '12 at 06:53