8

I'm using a modified version of the versioning code example that comes with SQLAlchemy to record a user id and date on changes. However, I also want to modify it so deletes are done by marking a is_deleted type flag instead of running an actual SQL DELETE. My problem is I'm not sure how to capture the delete and replace it with an update.

Here's what I have so far:

''' http://docs.sqlalchemy.org/en/rel_0_8/orm/examples.html?highlight=versioning#versioned-objects '''
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import mapper, class_mapper, attributes, object_mapper, scoping
from sqlalchemy.orm.session import Session
from sqlalchemy.orm.exc import UnmappedClassError, UnmappedColumnError
from sqlalchemy import Table, Column, ForeignKeyConstraint, DateTime, String, Boolean
from sqlalchemy import event
from sqlalchemy.orm.properties import RelationshipProperty
from datetime import datetime
from sqlalchemy.schema import ForeignKey
from sqlalchemy.sql.expression import false

def col_references_table(col, table):
    for fk in col.foreign_keys:
        if fk.references(table):
            return True
    return False

def _history_mapper(local_mapper):
    cls = local_mapper.class_

    # set the "active_history" flag
    # on on column-mapped attributes so that the old version
    # of the info is always loaded (currently sets it on all attributes)
    for prop in local_mapper.iterate_properties:
        getattr(local_mapper.class_, prop.key).impl.active_history = True

    super_mapper = local_mapper.inherits
    super_history_mapper = getattr(cls, '__history_mapper__', None)

    polymorphic_on = None
    super_fks = []
    if not super_mapper or local_mapper.local_table is not super_mapper.local_table:
        cols = []
        for column in local_mapper.local_table.c:
            if column.name.startswith('version_'):
                continue

            col = column.copy()
            col.unique = False

            if super_mapper and col_references_table(column, super_mapper.local_table):
                super_fks.append((col.key, list(super_history_mapper.local_table.primary_key)[0]))

            cols.append(col)

            if column is local_mapper.polymorphic_on:
                polymorphic_on = col

        if super_mapper:
            super_fks.append(('version_datetime', super_history_mapper.base_mapper.local_table.c.version_datetime))
            super_fks.append(('version_userid', super_history_mapper.base_mapper.local_table.c.version_userid))
            super_fks.append(('version_deleted', super_history_mapper.base_mapper.local_table.c.version_deleted))
            cols.append(Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=True, info={'colanderalchemy': {'exclude': True}}))
            cols.append(Column('version_userid', String(60), ForeignKey("user.login"), nullable=True, info={'colanderalchemy': {'exclude': True}}))
            cols.append(Column('version_deleted', Boolean, server_default=false(), nullable=False, info={'colanderalchemy': {'exclude': True}}))
        else:
            cols.append(Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=True, info={'colanderalchemy': {'exclude': True}}))
            cols.append(Column('version_userid', String(60), ForeignKey("user.login"), nullable=True, info={'colanderalchemy': {'exclude': True}}))
            cols.append(Column('version_deleted', Boolean, server_default=false(), nullable=False, info={'colanderalchemy': {'exclude': True}}))

        if super_fks:
            cols.append(ForeignKeyConstraint(*zip(*super_fks)))

        table = Table(local_mapper.local_table.name + '_history', local_mapper.local_table.metadata,
           *cols
        )
    else:
        # single table inheritance.  take any additional columns that may have
        # been added and add them to the history table.
        for column in local_mapper.local_table.c:
            if column.key not in super_history_mapper.local_table.c:
                col = column.copy()
                col.unique = False
                super_history_mapper.local_table.append_column(col)
        table = None

    if super_history_mapper:
        bases = (super_history_mapper.class_,)
    else:
        bases = local_mapper.base_mapper.class_.__bases__
    versioned_cls = type.__new__(type, "%sHistory" % cls.__name__, bases, {})

    m = mapper(
            versioned_cls,
            table,
            inherits=super_history_mapper,
            polymorphic_on=polymorphic_on,
            polymorphic_identity=local_mapper.polymorphic_identity
            )
    cls.__history_mapper__ = m

    if not super_history_mapper:
        local_mapper.local_table.append_column(
            Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=False, info={'colanderalchemy': {'exclude': True}})
        )
        local_mapper.add_property("version_datetime", local_mapper.local_table.c.version_datetime)
        local_mapper.local_table.append_column(
            Column('version_userid', String(60), ForeignKey("user.login"), nullable=True, info={'colanderalchemy': {'exclude': True}})
        )
        local_mapper.add_property("version_userid", local_mapper.local_table.c.version_userid)
        local_mapper.local_table.append_column(
            Column('version_deleted', Boolean, server_default=false(), nullable=False, info={'colanderalchemy': {'exclude': True}})
        )
        local_mapper.add_property("version_deleted", local_mapper.local_table.c.version_deleted)


class Versioned(object):
    @declared_attr
    def __mapper_cls__(cls):
        def map(cls, *arg, **kw):
            mp = mapper(cls, *arg, **kw)
            _history_mapper(mp)
            return mp
        return map


def versioned_objects(iter):
    for obj in iter:
        if hasattr(obj, '__history_mapper__'):
            yield obj

def create_version(obj, session, deleted = False):
    obj_mapper = object_mapper(obj)
    history_mapper = obj.__history_mapper__
    history_cls = history_mapper.class_

    obj_state = attributes.instance_state(obj)

    attr = {}

    obj_changed = False

    for om, hm in zip(obj_mapper.iterate_to_root(), history_mapper.iterate_to_root()):
        if hm.single:
            continue

        for hist_col in hm.local_table.c:
            if hist_col.key.startswith('version_'):
                continue

            obj_col = om.local_table.c[hist_col.key]

            # get the value of the
            # attribute based on the MapperProperty related to the
            # mapped column.  this will allow usage of MapperProperties
            # that have a different keyname than that of the mapped column.
            try:
                prop = obj_mapper.get_property_by_column(obj_col)
            except UnmappedColumnError:
                # in the case of single table inheritance, there may be
                # columns on the mapped table intended for the subclass only.
                # the "unmapped" status of the subclass column on the
                # base class is a feature of the declarative module as of sqla 0.5.2.
                continue

            # expired object attributes and also deferred cols might not be in the
            # dict.  force it to load no matter what by using getattr().
            if prop.key not in obj_state.dict:
                getattr(obj, prop.key)

            a, u, d = attributes.get_history(obj, prop.key)

            if d:
                attr[hist_col.key] = d[0]
                obj_changed = True
            elif u:
                attr[hist_col.key] = u[0]
            else:
                # if the attribute had no value.
                attr[hist_col.key] = a[0]
                obj_changed = True

    if not obj_changed:
        # not changed, but we have relationships.  OK
        # check those too
        for prop in obj_mapper.iterate_properties:
            if isinstance(prop, RelationshipProperty) and \
                attributes.get_history(obj, prop.key).has_changes():
                obj_changed = True
                break

    if not obj_changed and not deleted:
        return

    attr['version_datetime'] = obj.version_datetime
    attr['version_userid'] = obj.version_userid
    attr['version_deleted'] = obj.version_deleted
    hist = history_cls()
    for key, value in attr.items():
        setattr(hist, key, value)
    session.add(hist)
    obj.version_datetime = datetime.now()
    obj.version_userid = getattr(session, 'userid', None)
    obj.version_deleted = deleted

def versioned_session(session):
    @event.listens_for(session, 'before_flush')
    def before_flush(session, flush_context, instances):
        for obj in versioned_objects(session.deleted):
            create_version(obj, session, deleted = True)
        for obj in versioned_objects(session.dirty):
            create_version(obj, session)

def add_userid_to_session(userid, session):
    if isinstance(session, scoping.scoped_session):
        thread_local_session = session.registry()
        thread_local_session.userid = userid
    elif isinstance(session, Session):
        session.userid = userid
    else:
        raise TypeError("Not sure how to add the userid into session of type {}".format(type(session))) 

And here's how I'm using it (all non-essential parts have been cut out):

Base = declarative_base()

class User(Versioned, Base):
    __tablename__ = 'user'
    login = Column(String(60), primary_key=True, nullable=False)

    groups = association_proxy('user_to_groups', 'group', creator=lambda group: UserToGroup(group_name=group.name))

    def __init__(self, login, groups=None):
        self.login = login
        if groups:
            for group in groups:
                self.groups.append(group)

class Group(Versioned, Base):
    __tablename__ = 'group'
    name = Column(String(100), primary_key=True, nullable=False)
    description = Column(String(100), nullable=True)

    users = association_proxy('group_to_user', 'user', creator=lambda user: UserToGroup(user_login=user.login))

    def __eq__(self, other):
        return self.name == other.name

class UserToGroup(Versioned, Base):
    __tablename__ = 'user_to_group'
    user_login = Column(String(60), ForeignKey(User.login), primary_key=true)
    group_name = Column(String(100), ForeignKey(Group.name), primary_key=true)

    user = relationship(User, backref=backref('user_to_groups', cascade='all, delete-orphan'))
    group = relationship(Group, backref=backref('group_to_user', cascade='all, delete-orphan'))


session.configure(bind=engine)
add_userid_to_session("test", session.registry())
versioned_session(session)
user = session.query(User).filter(User.login=='test').one()
user.groups.remove(Group(name ="g:admin"))

Before running that code the database currently has one user called 'test' and two groups that the user is attached to called 'g:admin' and 'g:superadmin'.

What it currently does is: Copy the existing user_to_group entry for the 'test' => 'g:admin' mapping and copy it to the history table. Then delete the entry from user_to_group.

What I'd like it to do is copy the value to the history table and then update the entry in user_to_group to have version_deleted set to true.

I'm thinking the way to do that is to snatch the entry out of the session.deleted (that's why I changed the order from the original code) and modify it put it into session.dirty. I'm just not sure what the "safest" way of doing this.

Another issue (which will likely require another question) is how to detect relationships which are covered in another table as currently the system makes a copy of the 'user' row into the history table and then updates the version information despite no real changes being made to the row.

EDIT: I've decided to do things a bit differently, but still have a problem... Instead of having a "deleted" flag in the live tables I actually delete the content and record another history item indicating when the deletion occurred. If I'm deleting an object directly then this works correctly. If I delete an object off of a relationship I'm not able to do it properly. A DELETE get's issued to the relationship table to remove the link, but I can't seem to figure out how to detect that deletion in the "create_version" method.

For example, if I do:

group = session.query(Group).filter(Group.name=='g:admin').one()
group.users.remove(group.users[0])

No objects are placed in session.deleted. I can detect some sort of deletion via attributes.get_history(obj, prop.key), but it seems to indicate a deletion of a UserToGroup object from Group (which I want to detect and record a history item on), but then also indicates a deletion of a Group from the UserToGroup object (which I don't want to do anything about because the actual Group is not being deleted).

Tim Tisdall
  • 9,914
  • 3
  • 52
  • 82
  • 1
    You're my only hope Mike Bayer! – Tim Tisdall Apr 19 '13 at 15:50
  • its not easy but you'd have to forego using deletes, and instead use [attribute events](http://docs.sqlalchemy.org/en/rel_0_8/orm/events.html#attribute-events) to maintain that "deleted" flag when you remove the association from a collection. – zzzeek Apr 20 '13 at 07:04
  • overall, the main strategy of all the versioning recipes is to use a [before_flush](http://docs.sqlalchemy.org/en/rel_0_8/orm/events.html#sqlalchemy.orm.events.SessionEvents.before_flush) handler, run through everything that's changed, then create all the various history objects/flags/etc. – zzzeek Apr 20 '13 at 07:05
  • okay, what about my "EDIT" comments? I'm fine allowing deletes, but I want to capture them in the before_flush handler so I can record it into the history tables. In a bi-directional relationship I can't seem to figure out how to tell what actual DELETE is going to be run. – Tim Tisdall Apr 20 '13 at 15:35
  • yeah there's not a spectacular way to do this...places you could catch include, catching the attribute remove event, doing the get_history() call that you're doing and processing only for the UserToGroup records, maybe intercepting after_delete(), maybe even after_flush(). Recent versions of the Session allow for a "second flush" to happen if more state is changed after the first flush. Maybe that makes it easier. versioning like this is the kind of thing I usually have to sit down and spend several hours working out for someone. – zzzeek Apr 20 '13 at 22:08
  • I thought about using the `after_delete` event, but it says in the docs to specifically avoid doing a `Session.add()` which is what I'd need to do in this case. Is it okay since the add is being placed into a logging table I wouldn't be accessing again in that session? Is this a case covered by your statement about Session making a second flush? – Tim Tisdall Apr 22 '13 at 13:39
  • yeah doing a Session.add() is not advised there, but you might be able to get away with it in some cases, particularly since we've implemented the "keep flushing" feature. It's worth a try. – zzzeek Apr 22 '13 at 17:16
  • not sure if this will help me, but why is the UserToGroup object in this case not in the session.deleted but a DELETE is called to the underlying database? – Tim Tisdall Apr 24 '13 at 16:25
  • What about using the property's `direction` attribute to determine which half of the relationship is going to involve a DELETE? – Tim Tisdall Apr 24 '13 at 18:32
  • session.deleted doesn't necessarily have every item that's to be deleted in it due to a "delete" cascade - those are in some cases determined only after the flush has started. i don't see what "direction" is going to get you. the only way to detect absolutely every delete is to hit before_delete(),after_delete(), or after_flush() and look in the uow context to see everything that was deleted. – zzzeek Apr 24 '13 at 20:25
  • I'm very new to SQLAlchemy so I'm kind of guessing. I was thinking if I had only one relationship, there was a delete on that relationship, and I'm on the "many" side of the relationship that I could assume there would be a delete. That'd probably be a pretty brittle solution, though. How do I see what all was deleted from the UOW object? I can't see any documentation on that. – Tim Tisdall Apr 25 '13 at 13:56
  • yeah i think we're out in the weeds on this one. Digging into the uow context is probably not a good idea since the format of that is not stable. I think the safest place to discover deletes due to delete-orphan cascade is the before_delete() or after_delete() event. Besides that, you can use an attribute event to detect when an object is removed from a collection, which with delete-orphan cascade would cause it to be deleted, but only if it weren't re-added elsewhere. – zzzeek Apr 25 '13 at 22:47
  • just to make sure I understand correctly... I'd need to modify my code to iterate over all the attributes, find the relationship ones, and then add a xxx_delete event to each one, right? – Tim Tisdall Apr 26 '13 at 14:20
  • after_delete() is a mapper event, so you'd set that up per-mapped-class. the attribute event would require that you identify all the "delete-orphan" relationships, yes. – zzzeek Apr 26 '13 at 19:10

0 Answers0