Last active
September 8, 2023 19:41
-
-
Save CheeseCake87/20398198f2b388e27b396cdbb92cfe56 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# With help from : https://github.com/jonbiemond | |
# | |
# requires: | |
# pip install flask flask_sqlalchemy | |
# | |
# run: flask --app sqlalchemy_to_josnable_dict.py init-db | |
# run: flask --app sqlalchemy_to_josnable_dict.py test-data | |
# run: flask --app sqlalchemy_to_josnable_dict.py get-user1 | |
# run: flask --app sqlalchemy_to_josnable_dict.py get-user2 | |
# | |
import json | |
from flask import Flask | |
from flask_sqlalchemy import SQLAlchemy | |
# Needed for class HelperMixin \/ | |
import typing as t | |
from datetime import datetime | |
from sqlalchemy import Result | |
from sqlalchemy.orm.collections import InstrumentedList | |
from sqlalchemy.engine import Row | |
# Needed for class HelperMixin /\ | |
from sqlalchemy import select, insert | |
from sqlalchemy import types as st | |
app = Flask(__name__) | |
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///test.db" | |
db = SQLAlchemy(app) | |
class JSONMixin: | |
@classmethod | |
def as_jsonable_dict( | |
cls, | |
execute: Result, | |
include_joins: list = None, | |
all_columns_but: list = None, | |
only_columns: list = None, | |
) -> t.Dict[str, t.List[t.Dict[str, t.Any] | None]]: | |
""" | |
Expects: execute = db.session.execute(query) | |
execute is run with .scalars() and .all() to get the results | |
execute.scalars().all() | |
""" | |
if include_joins is None: | |
include_joins = [] | |
if all_columns_but is None: | |
all_columns_but = [] | |
if only_columns is None: | |
only_columns = [] | |
def include_column(column): | |
if only_columns: | |
if column not in only_columns: | |
return False | |
if all_columns_but: | |
if column in all_columns_but: | |
return False | |
return True | |
def as_dict(row: Row) -> dict: | |
return {key: row.__dict__[key] for key in row.__dict__ if key[0] != '_'} | |
def parse_value(value): | |
if isinstance(value, datetime): | |
return value.strftime("%Y-%m-%d %H:%M:%S") | |
if isinstance(value, int) or isinstance(value, bool) or isinstance(value, str): | |
return value | |
if isinstance(value, t.Iterable): | |
if isinstance(value, dict): | |
return {key: parse_value(x) for key, x in value.items()} | |
return [parse_value(x) for x in value] | |
return f"{value}" | |
def parse_row(row: Row, _is_join: bool = False): | |
data = dict() | |
for column, value in as_dict(row).items(): | |
if not include_column(column): | |
continue | |
if isinstance(value, InstrumentedList): | |
continue | |
data[column] = parse_value(value) | |
if _is_join: | |
return data | |
joins = dict() | |
for join in include_joins: | |
if hasattr(row, join): | |
joins[join] = [parse_row(row, _is_join=True) for row in getattr(row, join)] or [] | |
return data, joins | |
def parse(query_row: Row): | |
data, joins = parse_row(query_row) | |
return {**data, **joins} | |
return {cls.__name__: [parse(x) for x in execute.scalars().all()]} | |
@classmethod | |
def as_json( | |
cls, | |
execute: Result, | |
include_joins: list = None, | |
all_columns_but: list = None, | |
only_columns: list = None, | |
) -> str | None: | |
return json.dumps(cls.as_jsonable_dict( | |
execute, | |
include_joins=include_joins, | |
all_columns_but=all_columns_but, | |
only_columns=only_columns, | |
)) | |
class Cars(db.Model): | |
# PriKey | |
car_id = db.Column(db.Integer, primary_key=True) | |
# ForKey | |
fk_user_id = db.Column(db.Integer, db.ForeignKey("users.user_id"), nullable=False) | |
# Data | |
make = db.Column(db.String(256), nullable=False) | |
# tracking | |
created = db.Column(db.DateTime, nullable=True) | |
updated = db.Column(db.DateTime, nullable=True, onupdate=datetime.now()) | |
@classmethod | |
def create( | |
cls, | |
user_id: int, | |
make: str, | |
): | |
result = db.session.execute( | |
insert(cls).values( | |
fk_user_id=user_id, | |
make=make, | |
created=datetime.now() | |
).returning(cls.car_id) | |
).scalar_one_or_none() | |
db.session.commit() | |
return result | |
class Boats(db.Model): | |
# PriKey | |
boat_id = db.Column(db.Integer, primary_key=True) | |
# ForKey | |
fk_user_id = db.Column(db.Integer, db.ForeignKey("users.user_id"), nullable=False) | |
# Data | |
make = db.Column(db.String(256), nullable=False) | |
stats = db.Column(st.JSON, nullable=True) | |
# tracking | |
created = db.Column(db.DateTime, nullable=True) | |
updated = db.Column(db.DateTime, nullable=True, onupdate=datetime.now()) | |
@classmethod | |
def create( | |
cls, | |
user_id: int, | |
make: str, | |
stats: dict, | |
): | |
result = db.session.execute( | |
insert(cls).values( | |
fk_user_id=user_id, | |
make=make, | |
stats=stats, | |
created=datetime.now() | |
).returning(cls.boat_id) | |
).scalar_one_or_none() | |
db.session.commit() | |
return result | |
class Users(db.Model, JSONMixin): | |
user_id = db.Column(db.Integer, primary_key=True) | |
username = db.Column(db.String(256), nullable=False) | |
# tracking | |
created = db.Column(db.DateTime, nullable=True) | |
updated = db.Column(db.DateTime, nullable=True, onupdate=datetime.now()) | |
# relationships | |
rel_cars = db.relationship( | |
"Cars", | |
primaryjoin="Users.user_id == Cars.fk_user_id" | |
) | |
rel_boats = db.relationship( | |
"Boats", | |
primaryjoin="Users.user_id == Boats.fk_user_id" | |
) | |
@classmethod | |
def get_by_id(cls, user_id): | |
query = select(cls).filter_by(user_id=user_id) | |
return cls.as_jsonable_dict( | |
db.session.execute(query), | |
include_joins=["rel_cars", "rel_boats"], | |
) | |
@classmethod | |
def create( | |
cls, | |
username: str, | |
): | |
result = db.session.execute( | |
insert(cls).values( | |
username=username, | |
created=datetime.now() | |
).returning(cls.user_id) | |
).scalar_one_or_none() | |
db.session.commit() | |
return result | |
@app.cli.command("init-db") | |
def init_db(): | |
db.drop_all() | |
db.create_all() | |
@app.cli.command("test-data") | |
def test_data(): | |
user1 = Users.create(username="user1") | |
Cars.create(user_id=user1, make="Ford") | |
Cars.create(user_id=user1, make="Mazda") | |
Boats.create(user_id=user1, make="Big", stats={"length": 100}) | |
user2 = Users.create(username="user2") | |
Cars.create(user_id=user2, make="Ford") | |
Boats.create(user_id=user2, make="Small", stats={"length": 10}) | |
Boats.create(user_id=user2, make="Medium", stats={"length": 50}) | |
@app.cli.command("get-user1") | |
def user1_data(): | |
print(json.dumps(Users.get_by_id(1), indent=4)) | |
@app.cli.command("get-user2") | |
def user2_data(): | |
print(json.dumps(Users.get_by_id(2), indent=4)) | |
""" | |
flask --app sqlalchemy_to_josnable_dict.py get-user1 | |
{ | |
"Users": [ | |
{ | |
"created": "2023-08-24 09:53:05", | |
"user_id": 1, | |
"updated": "None", | |
"username": "user1", | |
"rel_cars": [ | |
{ | |
"updated": "None", | |
"make": "Ford", | |
"created": "2023-08-24 09:53:05", | |
"car_id": 1, | |
"fk_user_id": 1 | |
}, | |
{ | |
"updated": "None", | |
"make": "Mazda", | |
"created": "2023-08-24 09:53:05", | |
"car_id": 2, | |
"fk_user_id": 1 | |
} | |
], | |
"rel_boats": [ | |
{ | |
"fk_user_id": 1, | |
"created": "2023-08-24 09:53:05", | |
"make": "Big", | |
"boat_id": 1, | |
"stats": { | |
"length": 100 | |
}, | |
"updated": "None" | |
} | |
] | |
} | |
] | |
} | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment