Created
April 9, 2019 21:22
-
-
Save AlJohri/ac9996a19edb3fcc0c6b7524ef5a99f2 to your computer and use it in GitHub Desktop.
sample usage of sqlalchemy reflection to load tables hooks to print queries as they happen
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
http://docs.sqlalchemy.org/en/latest/core/tutorial.html | |
""" | |
import time | |
import logging | |
import logging.config | |
import warnings | |
import sqlparse | |
from functools import partial | |
from sqlalchemy import create_engine, event | |
from sqlalchemy import Table, MetaData | |
from sqlalchemy import exc as sa_exc | |
from sqlalchemy.sql import select, and_, or_, func | |
from sqlalchemy.orm.query import Query | |
import time | |
from contextlib import contextmanager | |
@contextmanager | |
def timewith(logger, name): | |
start = time.time() | |
yield | |
end = time.time() | |
elapsed_ms = (end - start) * 1000 | |
output = f'[{name}] finished in {elapsed_ms:.2f} ms' | |
logger.debug(output) | |
logger = logging.getLogger(__name__) | |
# logger.setFormatter(MultilineMessagesFormatter) | |
timewith = partial(timewith, logger) | |
POSTGRES_URL = 'postgres://....' | |
def create_hook(): | |
@event.listens_for(engine, 'before_cursor_execute', named=True) | |
def receive_after_cursor_execute(**kw): | |
"listen for the 'after_cursor_execute' event" | |
conn = kw['conn'] | |
cursor = kw['cursor'] | |
statement = kw['statement'] | |
parameters = kw['parameters'] | |
final = sqlparse.format(cursor.mogrify(statement, parameters).decode('utf-8'), reindent=True) | |
logging.debug(f'EXECUTING SQL:\n{final}') | |
def load_tables(): | |
with warnings.catch_warnings(): | |
# sqlalchemy can't understand partial indexes, this is a harmless warning | |
# ignore Predicate of partial index ___ ignored during reflection | |
warnings.simplefilter("ignore", category=sa_exc.SAWarning) | |
with timewith('reflect database tables'): | |
meta.reflect(bind=engine) | |
# establish hook after meta.reflect | |
create_hook() | |
return meta.tables | |
def get_articles_count(conn, s): | |
s = s.with_only_columns([func.count()]) | |
return conn.execute(s).fetchone()[0] | |
def get_articles(conn, s): | |
return (dict(x) for x in conn.execute(s)) | |
# http://docs.sqlalchemy.org/en/latest/core/tutorial.html | |
def get_articles_query( | |
fields=None, | |
published=None, | |
organization=None, | |
last_processed_at_start_date=None, | |
last_processed_at_end_date=None, | |
published_at_start_date=None, | |
published_at_end_date=None, | |
order_by=None, | |
direction=None, | |
limit=None): | |
fields = fields or ['id', 'organization', 'headline', 'canonical_url', 'published_at', 'last_processed_at'] | |
columns = [articles.c[x] for x in fields] | |
s = select(columns) | |
if last_processed_at_start_date is not None and last_processed_at_end_date is not None: | |
s = s.where(articles.c.last_processed_at.between( | |
last_processed_at_start_date, | |
last_processed_at_end_date)) | |
if published_at_start_date is not None and published_at_end_date is not None: | |
s = s.where(articles.c.published_at.between( | |
published_at_start_date, | |
published_at_end_date)) | |
if published is True: | |
s = s.where(or_(articles.c.published == True, articles.c.published == None)) | |
if organization: | |
s = s.where(articles.c.organization == organization) | |
elif published is not True and published is not None: | |
raise NotImplementedError() | |
if order_by: | |
if direction == 'asc': | |
s.order_by(articles.c[order_by].asc()) | |
else: | |
s.order_by(articles.c[order_by].desc()) | |
s = s.limit(limit) | |
return s | |
engine = create_engine(POSTGRES_URL, echo=False) | |
meta = MetaData() | |
tables = load_tables() | |
articles = tables['articles'] | |
if __name__ == "__main__": | |
import json | |
import arrow | |
import logging | |
import datetime | |
from pprint import pprint as pp | |
from sqlalchemy.sql import func | |
from blessings import Terminal | |
t = Terminal() | |
from multiline_formatter.formatter import MultilineMessagesFormatter | |
mlf = MultilineMessagesFormatter(fmt='[%(levelname)s] %(asctime)s %(message)s') | |
stream_handler = logging.StreamHandler() | |
stream_handler.setFormatter(mlf) | |
logging.basicConfig(level='INFO',handlers=[stream_handler]) | |
logger = logging.getLogger(__name__) | |
conn = engine.connect() | |
days = 14 | |
query = get_articles_query( | |
fields=['id', 'organization', 'headline', 'canonical_url', 'published_at', 'last_processed_at', 'html', 'original'], | |
published=True, | |
organization='washpost', | |
last_processed_at_start_date=func.current_date() - datetime.timedelta(days=days), | |
last_processed_at_end_date=func.current_date(), | |
published_at_start_date=func.current_date() - datetime.timedelta(days=days), | |
published_at_end_date=func.current_date(), | |
order_by='last_processed_at', | |
direction='desc') | |
print(f"Querying for articles published AND processed within the last {days} days") | |
count = get_articles_count(conn, query) | |
print(f"Found {count} articles") | |
for i, article in enumerate(get_articles(conn, query)): | |
original = article.pop('original') | |
print(original) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment