Skip to content

Instantly share code, notes, and snippets.

@funseiki
Created December 4, 2020 21:21
Show Gist options
  • Save funseiki/48a40573475a9be5caa2990acf523848 to your computer and use it in GitHub Desktop.
Save funseiki/48a40573475a9be5caa2990acf523848 to your computer and use it in GitHub Desktop.
import traceback
from contextlib import contextmanager
from sqlalchemy import create_engine, inspect, Column, Integer, String, DateTime, ForeignKey, Table, MetaData
from sqlalchemy.orm import sessionmaker, relationship, aliased, contains_eager, scoped_session, backref, mapper
engine = create_engine('sqlite:///:memory:', echo=True)
metadata = MetaData(bind=engine)
objecttypes = Table('objecttypes', metadata,
Column('id', Integer, primary_key=True),
Column('name', String)
)
class ObjectType(object):
def __repr__(self):
return "ObjectType({}, id={})".format(self.name, self.id)
mapper(ObjectType, objecttypes)
metadata.create_all(engine) # We'll call this again later
sessionFactory = sessionmaker(bind=engine, expire_on_commit=False)
scopedSessionFactory = scoped_session(sessionFactory)
def startScope():
return scopedSessionFactory()
def endScope():
scopedSessionFactory().close()
scopedSessionFactory.remove()
return
def addObjectTypes():
"""
Add in all the object types to the db without using a session
"""
values = ["('location')", "('locationinput')", "('gate')"]
q = """INSERT INTO objecttypes (name) VALUES {}""".format(",".join(values))
engine.execute(q)
return
def buildObjectTypes():
addObjectTypes()
session = startScope()
types = session.query(ObjectType).all()
endScope()
return dict([(objType.name, objType) for objType in types])
# Holds all the types
typeDict = buildObjectTypes()
things = Table('things', metadata,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('name', String),
Column('object_type_id', Integer, ForeignKey('objecttypes.id')),
Column('version', Integer),
Column('timeCreated', DateTime)
)
class Thing(object):
def __init__(self, **kwds):
for key in kwds:
setattr(self, key, kwds[key])
return
def __repr__(self):
return "{}, id={}, type={}, version={}".format(self.name, self.id, self.objectType, self.version)
locations = Table('locations', metadata,
Column('id', Integer, ForeignKey('things.id'), primary_key=True))
class Location(Thing):
pass
mapper(Thing, things, version_id_col=things.c.version,
polymorphic_on="object_type_id",
with_polymorphic='*',
polymorphic_load='inline',
properties={
'objectType': relationship(ObjectType, cascade_backrefs=False)
})
mapper(Location, locations, polymorphic_identity=typeDict['location'].id)
metadata.create_all(engine)
@contextmanager
def sessionScopeLong():
try:
session = scopedSessionFactory()
yield session
session.commit()
except Exception as e:
session.rollback()
session.close()
scopedSessionFactory.remove()
raise
finally:
pass
return
def sessionScopeCallback(func, retries=0):
ret = None
sessionRetriever = sessionScopeLong
for attempt in range(0, retries + 1):
try:
with sessionRetriever() as session:
ret = func(session, attempt)
break
except Exception as e:
print("Found error : {}\n{}".format(e, traceback.format_exc(20)))
return ret
def write(obj):
def _onSession(session, attempt):
print("Merging object={}".format(obj))
return session.merge(obj)
return sessionScopeCallback(_onSession)
def reproduceError():
tokyo = Location(name="tokyo", objectType=typeDict['location'])
tokyo = write(tokyo)
print(tokyo)
# Error "has a NULL identity key" here
return
reproduceError()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment