Skip to content

Instantly share code, notes, and snippets.

@AlJohri
Created April 9, 2019 21:22
Show Gist options
  • Save AlJohri/ac9996a19edb3fcc0c6b7524ef5a99f2 to your computer and use it in GitHub Desktop.
Save AlJohri/ac9996a19edb3fcc0c6b7524ef5a99f2 to your computer and use it in GitHub Desktop.
sample usage of sqlalchemy reflection to load tables hooks to print queries as they happen
"""
http://docs.sqlalchemy.org/en/latest/core/tutorial.html
"""
import time
import logging
import logging.config
import warnings
import sqlparse
from functools import partial
from sqlalchemy import create_engine, event
from sqlalchemy import Table, MetaData
from sqlalchemy import exc as sa_exc
from sqlalchemy.sql import select, and_, or_, func
from sqlalchemy.orm.query import Query
import time
from contextlib import contextmanager
@contextmanager
def timewith(logger, name):
start = time.time()
yield
end = time.time()
elapsed_ms = (end - start) * 1000
output = f'[{name}] finished in {elapsed_ms:.2f} ms'
logger.debug(output)
logger = logging.getLogger(__name__)
# logger.setFormatter(MultilineMessagesFormatter)
timewith = partial(timewith, logger)
POSTGRES_URL = 'postgres://....'
def create_hook():
@event.listens_for(engine, 'before_cursor_execute', named=True)
def receive_after_cursor_execute(**kw):
"listen for the 'after_cursor_execute' event"
conn = kw['conn']
cursor = kw['cursor']
statement = kw['statement']
parameters = kw['parameters']
final = sqlparse.format(cursor.mogrify(statement, parameters).decode('utf-8'), reindent=True)
logging.debug(f'EXECUTING SQL:\n{final}')
def load_tables():
with warnings.catch_warnings():
# sqlalchemy can't understand partial indexes, this is a harmless warning
# ignore Predicate of partial index ___ ignored during reflection
warnings.simplefilter("ignore", category=sa_exc.SAWarning)
with timewith('reflect database tables'):
meta.reflect(bind=engine)
# establish hook after meta.reflect
create_hook()
return meta.tables
def get_articles_count(conn, s):
s = s.with_only_columns([func.count()])
return conn.execute(s).fetchone()[0]
def get_articles(conn, s):
return (dict(x) for x in conn.execute(s))
# http://docs.sqlalchemy.org/en/latest/core/tutorial.html
def get_articles_query(
fields=None,
published=None,
organization=None,
last_processed_at_start_date=None,
last_processed_at_end_date=None,
published_at_start_date=None,
published_at_end_date=None,
order_by=None,
direction=None,
limit=None):
fields = fields or ['id', 'organization', 'headline', 'canonical_url', 'published_at', 'last_processed_at']
columns = [articles.c[x] for x in fields]
s = select(columns)
if last_processed_at_start_date is not None and last_processed_at_end_date is not None:
s = s.where(articles.c.last_processed_at.between(
last_processed_at_start_date,
last_processed_at_end_date))
if published_at_start_date is not None and published_at_end_date is not None:
s = s.where(articles.c.published_at.between(
published_at_start_date,
published_at_end_date))
if published is True:
s = s.where(or_(articles.c.published == True, articles.c.published == None))
if organization:
s = s.where(articles.c.organization == organization)
elif published is not True and published is not None:
raise NotImplementedError()
if order_by:
if direction == 'asc':
s.order_by(articles.c[order_by].asc())
else:
s.order_by(articles.c[order_by].desc())
s = s.limit(limit)
return s
engine = create_engine(POSTGRES_URL, echo=False)
meta = MetaData()
tables = load_tables()
articles = tables['articles']
if __name__ == "__main__":
import json
import arrow
import logging
import datetime
from pprint import pprint as pp
from sqlalchemy.sql import func
from blessings import Terminal
t = Terminal()
from multiline_formatter.formatter import MultilineMessagesFormatter
mlf = MultilineMessagesFormatter(fmt='[%(levelname)s] %(asctime)s %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(mlf)
logging.basicConfig(level='INFO',handlers=[stream_handler])
logger = logging.getLogger(__name__)
conn = engine.connect()
days = 14
query = get_articles_query(
fields=['id', 'organization', 'headline', 'canonical_url', 'published_at', 'last_processed_at', 'html', 'original'],
published=True,
organization='washpost',
last_processed_at_start_date=func.current_date() - datetime.timedelta(days=days),
last_processed_at_end_date=func.current_date(),
published_at_start_date=func.current_date() - datetime.timedelta(days=days),
published_at_end_date=func.current_date(),
order_by='last_processed_at',
direction='desc')
print(f"Querying for articles published AND processed within the last {days} days")
count = get_articles_count(conn, query)
print(f"Found {count} articles")
for i, article in enumerate(get_articles(conn, query)):
original = article.pop('original')
print(original)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment