Last active
January 7, 2020 19:15
-
-
Save sonnyksimon/976de3b7fa91e458a6e986fbf348b557 to your computer and use it in GitHub Desktop.
sql: Another way of using SQLALchemy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sql | |
from settings import engine | |
sql.Table.engine = engine | |
class Blog(sql.Table): | |
id = sql.String(255) | |
title = sql.String(255) | |
body = sql.String(4096) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
python3 -c "import schema; schema.sql.recreate();" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlalchemy as sa | |
_make_url = sa.engine.url.URL | |
url = _make_url('mysql', 'root', 'password', 'localhost', 3306, 'smartersql') | |
engine = sa.create_engine(url) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
sql | |
~~~ | |
Another way of using SQLAlchemy. | |
:copyright: Pancubs.org | |
:license: MIT | |
""" | |
import sqlalchemy as sa | |
from utils import commify | |
_metadata = sa.MetaData() | |
## table generation | |
_all_tables = [] | |
class metatracker(type): | |
def __init__(self, name, bases, *a, **kw): | |
type.__init__(self, name, bases, *a, **kw) | |
if bases[0] != object: | |
_all_tables.append(self) | |
self.columns = self._analyze() | |
self.sql_table = self._sql_table(_metadata) | |
self.sql_name = self._sql_name_() | |
class Table(object, metaclass=metatracker): | |
@classmethod | |
def _sql_name_(cls): | |
return cls.__name__.lower() | |
@classmethod | |
def _analyze(cls): | |
columns = dict() | |
for k in dir(cls): | |
if isinstance(getattr(cls, k), Column): | |
v = getattr(cls, k) | |
v.sql_name = v._sql_name_(k) | |
if not hasattr(v, 'label'): | |
v.label = k.replace('_', ' ') | |
columns[k] = v | |
return columns | |
@staticmethod | |
def _primary(columns): | |
primary = dict() | |
for k, v in columns.items(): | |
if v.primary: | |
primary[k] = v | |
return primary | |
@classmethod | |
def _sql_table(cls, metadata): | |
return sa.Table(cls._sql_name_(), metadata, | |
*[sa.Column(v.sql_name, v.sql_type, | |
primary_key=v.primary, | |
unique=v.unique, | |
nullable=not(v.notnull), | |
default=v.default) | |
for k, v in cls.columns.items()]) | |
@classmethod | |
def create(cls): | |
cls.sql_table.create(cls.engine) | |
@classmethod | |
def drop(cls): | |
cls.sql_table.drop(cls.engine) | |
def __init__(self): | |
c = self.__class__ | |
self.__class__ = type(c.__name__, c.__bases__, dict(c.__dict__)) | |
class Column(object): | |
def __init__(self, **kw): | |
for k in kw: | |
setattr(self, k, kw[k]) | |
_sql_name_ = lambda self, k: k | |
default = None | |
primary = False | |
unique = False | |
notnull = False | |
display = lambda self, x: str(x) | |
class String(Column): | |
sql_type = sa.String() | |
def __init__(self, length=None, **kw): | |
super(String, self).__init__(**kw) | |
if length: | |
self.sql_type = sa.String(length) | |
class Boolean(Column): | |
sql_type = sa.Boolean() | |
display = lambda self, x: {True: 'Yes', False: 'No', None: 'Unknown'} | |
class Integer(Column): | |
sql_type = sa.Integer() | |
class Float(Column): | |
sql_type = sa.Float() | |
class BigInteger(Integer): sql_type = sa.BigInteger() | |
class Date(Column): sql_type = sa.Date() | |
class Year(Integer): pass | |
class Number(Integer): | |
display = lambda self, x: commify(x) | |
class Dollars(Integer): | |
display = lambda self, x: '$' + commify(x) | |
class Percentage(Float): | |
precision = 3 | |
display = lambda self, x: str(x*100)[:self.precision + 1] + '%' | |
# add one for the decimal point | |
class URL(String): pass | |
## module functions | |
def create(): | |
for table in _all_tables: | |
table.create() | |
def drop(): | |
x = list(_all_tables) | |
x.reverse() | |
for table in x: | |
table.drop() | |
def recreate(): | |
drop() | |
c |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
sql.utils | |
~~~~~~~~~ | |
Utility functions. | |
:copyright: Pancubs.org | |
:license: MIT | |
""" | |
def commify(n): | |
""" | |
Add commas to an integer `n`. | |
>>> commify(1) | |
'1' | |
>>> commify(123) | |
'123' | |
>>> commify(-123) | |
'-123' | |
>>> commify(1234) | |
'1,234' | |
>>> commify(1234567890) | |
'1,234,567,890' | |
>>> commify(123.0) | |
'123.0' | |
>>> commify(1234.5) | |
'1,234.5' | |
>>> commify(1234.56789) | |
'1,234.56789' | |
>>> commify(' %.2f ' % -1234.5) | |
'-1,234.50' | |
>>> commify(None) | |
>>> | |
""" | |
if n is None: | |
return None | |
n = str(n).strip() | |
if n.startswith("-"): | |
prefix = "-" | |
n = n[1:].strip() | |
else: | |
prefix = "" | |
if "." in n: | |
dollars, cents = n.split(".") | |
else: | |
dollars, cents = n, None | |
r = [] | |
for i, c in enumerate(str(dollars)[::-1]): | |
if i and (not (i % 3)): | |
r.insert(0, ",") | |
r.insert(0, c) | |
out = "".join(r) | |
if cents: | |
out += "." + cents | |
return prefix + out |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment