Last active
May 19, 2021 00:17
-
-
Save mdellavo/e8f64ed69e223a542b2b15adde9eda65 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
from enum import unique | |
import json | |
import datetime | |
import dataclasses | |
from typing import Optional | |
from sqlalchemy import Column, Integer, String, DateTime, JSON, Index, select, create_engine, and_ | |
from sqlalchemy.dialects.postgresql import JSONB | |
from sqlalchemy.orm import registry, sessionmaker | |
from sqlalchemy.ext.declarative import declared_attr | |
from sqlalchemy.types import TypeDecorator | |
from pydantic import BaseModel | |
from pydantic.json import pydantic_encoder | |
from sqlalchemy_json import mutable_json_type | |
import shortuuid | |
SA = "sa" | |
mapper_registry = registry() | |
Session = sessionmaker(expire_on_commit=False) | |
def generate_uid(length=8): | |
return shortuuid.uuid()[:length] | |
def serialize(o): | |
return json.dumps(o, default=pydantic_encoder) | |
def build_engine(dsn, **kwargs): | |
engine = create_engine( | |
dsn, | |
json_serializer=serialize, | |
**kwargs, | |
) | |
return engine | |
class JSONModel(TypeDecorator): | |
impl = JSON | |
def __init__(self, model): | |
super().__init__() | |
self.model = model | |
def process_result_value(self, value, dialect): | |
return self.model.parse_obj(value) | |
class JSONBModel(JSONModel): | |
impl = JSONB | |
@dataclasses.dataclass(init=False) | |
class BaseStorage(abc.ABC): | |
__sa_dataclass_metadata_key__ = SA | |
id: int = dataclasses.field(metadata={SA: Column(Integer, primary_key=True)}) | |
uid: str = dataclasses.field(metadata={SA: Column(String)}) | |
time_created: datetime.datetime = dataclasses.field(metadata={SA: Column(DateTime)}) | |
time_updated: datetime.datetime = dataclasses.field(metadata={SA: Column(DateTime)}) | |
class Properties(BaseModel): | |
pass | |
def __init__(self, **kwargs): | |
props = {k: v for k, v in kwargs.items() if k in self.Properties.__fields__} | |
super(BaseStorage, self).__init__() | |
self.uid = generate_uid() | |
self.time_created = self.time_updated = datetime.datetime.utcnow() | |
self.props = self.Properties(**props) | |
def __getattr__(self, key): | |
if "props" in self.__dict__ and key in self.__dict__["props"].__fields__: | |
return getattr(self.props, key) | |
raise AttributeError("unknown attribute: {}".format(key)) | |
@classmethod | |
def find_by_id(cls, id): | |
query = select(cls).filter_by(id=id) | |
with Session() as session: | |
return session.execute(query).scalar() | |
@classmethod | |
def find_by_uid(cls, uid): | |
query = select(cls).filter_by(uid=uid) | |
with Session() as session: | |
return session.execute(query).scalar() | |
@classmethod | |
def find_by_properties(cls, **kwargs): | |
query = select(cls) | |
for prop, value in kwargs.items(): | |
query = query.filter(cls.props[prop] == value) | |
with Session() as session: | |
return session.execute(query).all() | |
def update(self): | |
self.time_updated = datetime.datetime.utcnow() | |
def delete(self): | |
pass | |
@mapper_registry.mapped | |
@dataclasses.dataclass(init=False) | |
class User(BaseStorage): | |
__tablename__ = "users" | |
class Properties(BaseModel): | |
email: Optional[str] = None | |
last_login: Optional[datetime.datetime] = None | |
props: BaseModel = dataclasses.field(metadata={SA: Column(JSONBModel(Properties))}) | |
__table_args__ = ( | |
Index("idx_uid", "uid", unique=True), | |
Index("idx_props", "props", postgresql_using="gin"), | |
) | |
def main(): | |
# engine = build_engine("sqlite+pysqlite:///:memory:", echo=True, future=True) | |
engine = build_engine("postgresql://postgres:postgres@localhost/postgres", echo=True, future=True) | |
Session.configure(bind=engine) | |
mapper_registry.metadata.drop_all(bind=engine) | |
mapper_registry.metadata.create_all(bind=engine) | |
with Session() as session: | |
user = User(email="[email protected]", | |
last_login=datetime.datetime.utcnow()) | |
session.add(user) | |
print(user) | |
session.commit() | |
user = User.find_by_id(user.id) | |
print(user) | |
user = User.find_by_uid(user.uid) | |
print(user) | |
with Session() as session: | |
user.update() | |
session.add(user) | |
session.commit() | |
print(user) | |
user = User.find_by_properties(email="[email protected]") | |
print(user) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment