1

I have several classes with the same abstract base and the same schema, referring to similar tables in the database. My queries are very straightforward, no joins, a simple straightforward filtering conditions. I'm using polymorphic identity with the class hierarchy so I can perform unions seamlessly.

The problem is that sometimes I need to repeat the same query for several tables and perform an union. I couldn't find a solution to that problem in SQLAlchemy, and I'm trying to implement a method on my custom BaseQuery class that I can use to do all that automatically, by cloning the original query and changing the class/mapper used for the from clause.

For instance, today I have to do something like this:

query1 = MyModel1.query.filter_by(foo=bar)
query2 = MyModel2.query.filter_by(foo=bar)
query3 = MyModel3.query.filter_by(foo=bar)

query = query1.union(query2).union(query3)

And I would like to be able to do something like

query = MyModel1.query.filter_by(foo=bar).with_unions(MyModel2, MyModel3)

And with_unions would be something like this, where replace_from_clause is the method I'm after:

def with_unions(self, *others):
    query = self._clone()

    for other in others:
        query = query.union(replace_from_clause(query, other))

    return query

Is there something like the replace_from_clause method available somewhere in SQLAlchemy, or some way to implement it?

Needless to say, if there's a better approach to this, I'm all ears.

Pedro Werneck
  • 40,902
  • 7
  • 64
  • 85

1 Answers1

0

As far as I'm aware/in my experience/as per this StackOveflow answer: https://stackoverflow.com/a/10612690/3329834 you cannot union like this with the ORM.

I managed to achieve the syntax you were looking for (more or less) and load everything back into the orm on the return. Normal caveats regarding unions (same # of columns, etc) all apply with more here (need same column names by which you are filtering). Further, I don't think I would ever use this in practice....

from functools import partial
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import *
from sqlalchemy import orm
from sqlalchemy import sql

engine = sqlalchemy.create_engine('sqlite://')
connection = engine.connect()


Base = declarative_base()


class Student(Base):
    __tablename__ = "students"
    id = Column(Integer, primary_key=True)
    name = Column(String(767), unique=True)
    caretaker = Column(String(50))

    def __repr__(self):
        return 'Student(name={s.name}, caretaker={s.caretaker}'.format(s=self)


class Patient(Base):
    __tablename__ = "patients"
    id = Column(Integer, primary_key=True)
    name = Column(String(767), unique=True)
    caretaker = Column(String(50))

    def __repr__(self):
        return 'Patient(name={s.name}, caretaker={s.caretaker}'.format(s=self)

class StagedOperation(object):

    def __init__(self, attr):
        self.attr = attr

    def __call__(self, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs


class StagedQuery(object):

    def __init__(self, model, session=None):
        self.session = session
        self.models = [model]
        self.columns = [e.name for e in model.__table__.columns]
        self.ops = []

    def __getattr__(self, attr):
        # __getattr__ fires only when an attribute is requested & not found
        # We will attempt to pass on any attribute call on to the resulting 
        # Query objects; do note this will only work, technically and logicaly, 
        # with method calls, not attribute access 
        if hasattr(orm.query.Query, attr):
            obj = StagedOperation(attr)
            self.ops.append(obj)

            # really getting hacky to enable "chaining"
            # Could also build this into the StagedOperation.__call__
            def _allow_chaining(desired_return, op, *args, **kwargs):
                op(*args, **kwargs)
                return desired_return

            return partial(_allow_chaining, self, obj)

    def with_unions(self, *models):
        self.models.extend(models)
        return self

    def with_session(self, session):
        self.session = session
        return self

    def query(self):
        q = None
        for model in self.models:
            id_col = sql.literal(model.__tablename__).label('tablename')
            columns = self.columns + [id_col]
            mq = orm.query.Query(columns).select_from(model)
            for op in self.ops:
                mq = getattr(mq, op.attr)(*op.args, **op.kwargs)
            q = q.union(mq) if q else mq
        return q

    def _deserialize_row(self, row):
        ref = {e.__tablename__: e for e in self.models}
        return ref[row.tablename](**{k: getattr(row, k) for k in self.columns})

    def one(self):
        return self._deserialize_row(
            self.query().with_session(self.session).one())

    def first(self):
        r = self.query().with_session(self.session).first()
        if r:
            return self._deserialize_row(r)

    def all(self):
        return [
            self._deserialize_row(e) for e in
            self.query().with_session(self.session).all()]


if __name__ == '__main__':
    engine = create_engine('sqlite://')
    Session = orm.sessionmaker()
    Session.configure(bind=engine)
    Base.metadata.bind = engine
    Base.metadata.create_all()

    session = Session()

    #
    # Insert some objects
    #

    stu = Student(id=1, name='John', caretaker='Mother')
    stu2 = Student(id=2, name='Sally', caretaker='Mother')
    stu3 = Student(id=3, name='Scott', caretaker='Father')

    pat = Patient(id=1, name='Susan', caretaker='Mother')
    pat2 = Patient(id=2, name='Sally', caretaker='Father')
    pat3 = Patient(id=3, name='Turnip', caretaker='Father')

    session.add_all([stu, stu2, stu3, pat, pat2, pat3])
    session.flush()

    # Some usage options
    print (
        StagedQuery(Student)
        .filter_by(caretaker='Mother')
        .with_unions(Patient)
        .with_session(session)
        .all())

    print (
        StagedQuery(Student, session=session)
        .filter_by(caretaker='Mother')
        .filter_by(name='Sally')
        .with_unions(Patient)
        .all())

Prints ...

[Student(name=John, caretaker=Mother, Patient(name=Susan, caretaker=Mother, Student(name=Sally, caretaker=Mother]
[Student(name=Sally, caretaker=Mother]
Community
  • 1
  • 1
Jason
  • 1,009
  • 8
  • 11
  • Not terrible, but still far from usable. It gave me some ideas, thanks. By the way, the unions work when you have polymorphic identity. In fact, SQLAlchemy does the union automatically if I query with the base class, but he places the where clause in the final unionized query instead of each subquery, and MySQL is too dumb to optimize that. – Pedro Werneck Oct 19 '14 at 00:06
  • Yes unions do work when you have a polymorphic identity; I would add that to your question, it would have changed my answer. – Jason Oct 20 '14 at 12:39
  • Good idea. I just did it. – Pedro Werneck Oct 20 '14 at 13:32