Skip to content

Instantly share code, notes, and snippets.

@sonnyksimon
Last active January 7, 2020 19:15
Show Gist options
  • Save sonnyksimon/976de3b7fa91e458a6e986fbf348b557 to your computer and use it in GitHub Desktop.
Save sonnyksimon/976de3b7fa91e458a6e986fbf348b557 to your computer and use it in GitHub Desktop.
sql: Another way of using SQLALchemy
import sql
from settings import engine
sql.Table.engine = engine
class Blog(sql.Table):
id = sql.String(255)
title = sql.String(255)
body = sql.String(4096)
python3 -c "import schema; schema.sql.recreate();"
import sqlalchemy as sa
_make_url = sa.engine.url.URL
url = _make_url('mysql', 'root', 'password', 'localhost', 3306, 'smartersql')
engine = sa.create_engine(url)
# -*- coding: utf-8 -*-
"""
sql
~~~
Another way of using SQLAlchemy.
:copyright: Pancubs.org
:license: MIT
"""
import sqlalchemy as sa
from utils import commify
_metadata = sa.MetaData()
## table generation
_all_tables = []
class metatracker(type):
def __init__(self, name, bases, *a, **kw):
type.__init__(self, name, bases, *a, **kw)
if bases[0] != object:
_all_tables.append(self)
self.columns = self._analyze()
self.sql_table = self._sql_table(_metadata)
self.sql_name = self._sql_name_()
class Table(object, metaclass=metatracker):
@classmethod
def _sql_name_(cls):
return cls.__name__.lower()
@classmethod
def _analyze(cls):
columns = dict()
for k in dir(cls):
if isinstance(getattr(cls, k), Column):
v = getattr(cls, k)
v.sql_name = v._sql_name_(k)
if not hasattr(v, 'label'):
v.label = k.replace('_', ' ')
columns[k] = v
return columns
@staticmethod
def _primary(columns):
primary = dict()
for k, v in columns.items():
if v.primary:
primary[k] = v
return primary
@classmethod
def _sql_table(cls, metadata):
return sa.Table(cls._sql_name_(), metadata,
*[sa.Column(v.sql_name, v.sql_type,
primary_key=v.primary,
unique=v.unique,
nullable=not(v.notnull),
default=v.default)
for k, v in cls.columns.items()])
@classmethod
def create(cls):
cls.sql_table.create(cls.engine)
@classmethod
def drop(cls):
cls.sql_table.drop(cls.engine)
def __init__(self):
c = self.__class__
self.__class__ = type(c.__name__, c.__bases__, dict(c.__dict__))
class Column(object):
def __init__(self, **kw):
for k in kw:
setattr(self, k, kw[k])
_sql_name_ = lambda self, k: k
default = None
primary = False
unique = False
notnull = False
display = lambda self, x: str(x)
class String(Column):
sql_type = sa.String()
def __init__(self, length=None, **kw):
super(String, self).__init__(**kw)
if length:
self.sql_type = sa.String(length)
class Boolean(Column):
sql_type = sa.Boolean()
display = lambda self, x: {True: 'Yes', False: 'No', None: 'Unknown'}
class Integer(Column):
sql_type = sa.Integer()
class Float(Column):
sql_type = sa.Float()
class BigInteger(Integer): sql_type = sa.BigInteger()
class Date(Column): sql_type = sa.Date()
class Year(Integer): pass
class Number(Integer):
display = lambda self, x: commify(x)
class Dollars(Integer):
display = lambda self, x: '$' + commify(x)
class Percentage(Float):
precision = 3
display = lambda self, x: str(x*100)[:self.precision + 1] + '%'
# add one for the decimal point
class URL(String): pass
## module functions
def create():
for table in _all_tables:
table.create()
def drop():
x = list(_all_tables)
x.reverse()
for table in x:
table.drop()
def recreate():
drop()
c
# -*- coding: utf-8 -*-
"""
sql.utils
~~~~~~~~~
Utility functions.
:copyright: Pancubs.org
:license: MIT
"""
def commify(n):
"""
Add commas to an integer `n`.
>>> commify(1)
'1'
>>> commify(123)
'123'
>>> commify(-123)
'-123'
>>> commify(1234)
'1,234'
>>> commify(1234567890)
'1,234,567,890'
>>> commify(123.0)
'123.0'
>>> commify(1234.5)
'1,234.5'
>>> commify(1234.56789)
'1,234.56789'
>>> commify(' %.2f ' % -1234.5)
'-1,234.50'
>>> commify(None)
>>>
"""
if n is None:
return None
n = str(n).strip()
if n.startswith("-"):
prefix = "-"
n = n[1:].strip()
else:
prefix = ""
if "." in n:
dollars, cents = n.split(".")
else:
dollars, cents = n, None
r = []
for i, c in enumerate(str(dollars)[::-1]):
if i and (not (i % 3)):
r.insert(0, ",")
r.insert(0, c)
out = "".join(r)
if cents:
out += "." + cents
return prefix + out
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment