Skip to content

Instantly share code, notes, and snippets.

@agronholm
Created January 14, 2017 10:44
Show Gist options
  • Save agronholm/3c2db11c11d1546552d6e5ab89561861 to your computer and use it in GitHub Desktop.
Save agronholm/3c2db11c11d1546552d6e5ab89561861 to your computer and use it in GitHub Desktop.
Broadcasting SQLAlchemy model events
from sqlalchemy.orm.util import object_mapper
def get_primary_key(obj):
"""Returns the primary key of the given persistent object."""
mapper = object_mapper(obj)
pk = mapper.primary_key_from_instance(obj)
return pk[0] if len(pk) == 1 else pk
def get_old_values(obj):
"""
Returns previous values for attributes which have changed in this object.
:param obj: an instance of a mapped class
:return: dict of {attribute name: old value}
"""
mapper = object_mapper(obj)
modifications = {}
for property in mapper.iterate_properties:
if isinstance(property, ColumnProperty):
key = property.key
history = get_history(obj, key)
if history.has_changes():
modifications[key] = history.deleted[0] if history.deleted else None
return modifications
@listens_for(mapper, 'after_insert')
def after_insert(mapper, connection, target):
session = object_session(target)
messages = session.info.setdefault('broadcast_events', [])
messages.append({
'topic': 'entity-created',
'class_name': target.__class__.__name__,
'primary_key': get_primary_key(target),
'instance': target
})
@listens_for(mapper, 'after_update')
def after_update(mapper, connection, target):
session = object_session(target)
messages = session.info.setdefault('broadcast_events', [])
old_values = get_old_values(target)
if old_values:
messages.append({
'topic': 'entity-modified',
'class_name': target.__class__.__name__,
'primary_key': get_primary_key(target),
'instance': target,
'old_values': old_values
})
@listens_for(mapper, 'after_delete')
def after_delete(mapper, connection, target):
session = object_session(target)
messages = session.info.setdefault('broadcast_events', [])
messages.append({
'topic': 'entity-deleted',
'class_name': target.__class__.__name__,
'primary_key': get_primary_key(target),
'instance': target
})
@listens_for(Session, 'after_commit')
def after_commit(session):
def do_broadcast(events):
for event in events:
WebsocketHandler.broadcast_event(event)
if 'broadcast_events' in session.info:
events = session.info.pop('broadcast_events')
IOLoop.instance().add_callback(do_broadcast, events)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment