Created
April 20, 2015 14:39
-
-
Save shauns/1f368e0c2e2d553a5a31 to your computer and use it in GitHub Desktop.
Examples using SQLAlchemy for reference
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlalchemy import Column, Integer, Text, String, ForeignKey, \ | |
UniqueConstraint, Table, inspect | |
from sqlalchemy.ext.associationproxy import association_proxy | |
from sqlalchemy.ext.orderinglist import ordering_list | |
from sqlalchemy.orm import relationship, backref, was_deleted | |
from conftest import Base, Session | |
def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw): | |
cache = getattr(session, '_unique_cache', None) | |
if cache is None: | |
session._unique_cache = cache = {} | |
key = (cls, hashfunc(*arg, **kw)) | |
if key in cache: | |
return cache[key] | |
else: | |
with session.no_autoflush: | |
q = session.query(cls) | |
q = queryfunc(q, *arg, **kw) | |
obj = q.first() | |
if not obj: | |
obj = constructor(*arg, **kw) | |
session.add(obj) | |
cache[key] = obj | |
return obj | |
class UniqueMixin(object): | |
@classmethod | |
def unique_hash(cls, *arg, **kw): | |
raise NotImplementedError() | |
@classmethod | |
def unique_filter(cls, query, *arg, **kw): | |
raise NotImplementedError() | |
@classmethod | |
def as_unique(cls, session, *arg, **kw): | |
return _unique( | |
session, | |
cls, | |
cls.unique_hash, | |
cls.unique_filter, | |
cls, | |
arg, kw | |
) | |
def chapter_from_slug(chapter_slug): | |
return Chapter(slug=chapter_slug) | |
class Book(Base): | |
__tablename__ = 'books' | |
id = Column(Integer, primary_key=True, autoincrement=True) | |
title = Column(Text) | |
_isbn_objects = relationship( | |
'IsbnCode', | |
cascade='all, delete-orphan', | |
passive_deletes=True, | |
) | |
isbns = association_proxy( | |
'_isbn_objects', | |
'isbn', | |
creator=lambda isbn: IsbnCode.as_unique(Session(), isbn=isbn) | |
) | |
tags = association_proxy( | |
'tag_objects', | |
'tag', | |
creator=lambda tag: Tag.as_unique(Session(), tag) | |
) | |
association_table = Table( | |
'book_tags', | |
Base.metadata, | |
Column('book_id', Integer, ForeignKey('books.id', ondelete='cascade')), | |
Column('tag_id', Integer, ForeignKey('tags.id')) | |
) | |
class Tag(Base, UniqueMixin): | |
__tablename__ = 'tags' | |
id = Column(Integer, primary_key=True, autoincrement=True) | |
tag = Column(String(30), unique=True) | |
books = relationship( | |
'Book', | |
secondary=association_table, | |
backref=backref( | |
'tag_objects', | |
), | |
# For this to work, tag_id FK on association table must be marked | |
# with ondelete=cascade | |
# passive_deletes=True, | |
) | |
def __init__(self, tag): | |
self.tag = tag | |
@classmethod | |
def unique_hash(cls, tag): | |
return tag | |
@classmethod | |
def unique_filter(cls, query, tag): | |
return query.filter(cls.tag == tag) | |
class Chapter(Base): | |
__tablename__ = 'chapters' | |
__table_args__ = ( | |
UniqueConstraint('slug', 'book_id'), | |
# Not supported for ordering list | |
# UniqueConstraint('position', 'book_id'), | |
) | |
id = Column(Integer, primary_key=True, autoincrement=True) | |
position = Column(Integer, nullable=False) | |
slug = Column(String(100)) | |
book_id = Column( | |
Integer, | |
ForeignKey('books.id', ondelete='cascade'), | |
nullable=False, | |
) | |
book = relationship( | |
'Book', | |
backref=backref( | |
'chapters', | |
order_by='Chapter.position', | |
collection_class=ordering_list('position'), | |
cascade='all, delete-orphan', # Chapters are owned by their Book | |
passive_deletes=True, | |
) | |
) | |
class IsbnCode(Base, UniqueMixin): | |
__tablename__ = 'isbns' | |
id = Column(Integer, primary_key=True, autoincrement=True) | |
isbn = Column(String(100), unique=True) | |
book_id = Column( | |
Integer, | |
ForeignKey('books.id', ondelete='cascade'), | |
nullable=False, | |
) | |
@classmethod | |
def unique_hash(cls, isbn): | |
return isbn | |
@classmethod | |
def unique_filter(cls, query, isbn): | |
return query.filter(cls.isbn == isbn) | |
class ManuallyKeyed(Base): | |
__tablename__ = 'manual_keys' | |
key = Column(Integer, primary_key=True) | |
class ParentThing(Base): | |
__tablename__ = 'parent_thing' | |
id = Column(Integer, primary_key=True, autoincrement=True) | |
class CompositeChild(Base): | |
__tablename__ = 'composite_child' | |
parent_id = Column( | |
Integer, | |
ForeignKey('parent_thing.id', ondelete='cascade'), | |
nullable=False, | |
primary_key=True | |
) | |
parent = relationship | |
supplemental = Column( | |
Integer, | |
nullable=False, | |
primary_key=True, | |
) | |
def test_books(session): | |
other_sess = Session() | |
assert session is other_sess | |
book_1 = Book(title='Book One') | |
book_2 = Book(title='Book Two') | |
session.add_all([book_1, book_2]) | |
session.flush() | |
book_1_chapter_2 = Chapter(slug='shared') | |
book_1_chapter_1 = Chapter(slug='one') | |
book_2_chapter_1 = Chapter(slug='shared') | |
book_1.chapters = [ | |
book_1_chapter_1, | |
book_1_chapter_2, | |
] | |
book_2.chapters = [ | |
book_2_chapter_1, | |
] | |
# chapters are added through virtue of `chapters` relationship | |
# session.add_all([ | |
# book_1_chapter_1, book_1_chapter_2, book_2_chapter_1 | |
# ]) | |
session.flush() | |
assert book_1_chapter_1.book.title == 'Book One' | |
assert book_1.chapters == [book_1_chapter_1, book_1_chapter_2] | |
book_2_chapter_2 = Chapter(slug='two') | |
book_2.chapters = [ | |
book_2_chapter_1, | |
book_2_chapter_2, | |
] | |
session.flush() | |
assert book_2_chapter_2.book.title == 'Book Two' | |
book_2_chapter_3 = Chapter(slug='three') | |
book_2.chapters = [ | |
book_2_chapter_2, | |
book_2_chapter_3, | |
] | |
# If position was part of a unique constraint, this would fail because we | |
# try to set position value *before* deleting the old chapters | |
session.flush() | |
assert book_2_chapter_3.book.title == 'Book Two' | |
assert book_2_chapter_1.book is None | |
assert was_deleted(book_2_chapter_1) | |
book_2_chapter_3.book = book_1 | |
session.flush() | |
assert book_2_chapter_3.book.title == 'Book One' | |
assert [] == book_1.isbns | |
book_1.isbns = ['foo', 'bar'] | |
session.flush() | |
book_1.isbns = ['foo', 'bar'] | |
session.flush() | |
tag_1 = Tag('one') | |
tag_2 = Tag('two') | |
book_1.tag_objects = [tag_1, tag_2] | |
session.flush() | |
assert tag_1.books[0].title == tag_2.books[0].title == 'Book One' | |
book_1.tag_objects = [tag_1] | |
session.flush() | |
assert tag_2.books == [] | |
assert ['one'] == book_1.tags | |
book_1.tags = ['two'] | |
session.flush() | |
# persistent vs. new when flushing | |
mk_1a = ManuallyKeyed(key=1) | |
session.add(mk_1a) | |
session.commit() | |
assert session.query(ManuallyKeyed).count() == 1 | |
mk_1b = ManuallyKeyed(key=1) | |
state = inspect(mk_1b) | |
assert state.transient | |
mk_1b = session.merge(mk_1b) | |
post_merge_state = inspect(mk_1b) | |
assert post_merge_state.persistent | |
session.add(mk_1b) | |
# Doesn't raise an error because `mk_1b` has been matched to existing | |
# instance | |
session.commit() | |
mk_2 = ManuallyKeyed(key=2) | |
just_created_state = inspect(mk_2) | |
assert just_created_state.transient | |
assert not just_created_state.persistent | |
mk_2 = session.merge(mk_2) | |
merging_but_no_persistent_state = inspect(mk_2) | |
assert merging_but_no_persistent_state.pending | |
assert not merging_but_no_persistent_state.persistent | |
session.add(mk_2) | |
session.delete(book_1) | |
session.delete(book_2) | |
session.flush() | |
assert was_deleted(book_1) | |
assert was_deleted(book_2) | |
# ISBNs gone | |
assert session.query(IsbnCode).count() == 0 | |
# Chapters gone | |
assert session.query(Chapter).count() == 0 | |
# But Tags live on | |
assert session.query(Tag).count() == 2 | |
book_3 = Book(title='Book three') | |
book_3.tags = ['one'] | |
session.add(book_3) | |
session.flush() | |
# testing what happens to M2M when tag dropped | |
session.delete(tag_1) | |
session.flush() | |
assert was_deleted(tag_1) | |
assert not was_deleted(book_3) | |
tag_state = inspect(tag_1) | |
assert tag_state.persistent and tag_state.deleted | |
# book_3's collection not changed yet | |
assert book_3.tag_objects[0] is tag_1 | |
assert book_3.tags == ['one'] | |
session.commit() | |
tag_state = inspect(tag_1) | |
assert not tag_state.persistent and tag_state.deleted | |
# now collection changed | |
assert book_3.tags == [] | |
assert book_3.tag_objects == [] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment