Last active
September 16, 2020 00:00
-
-
Save gugek/52e9f4fef53f49287db0 to your computer and use it in GitHub Desktop.
SQLAlchemy Model for Citation Relations (Association Proxy)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Schema for bibliographic and citation datbaase | |
Uses a two-way association proxy to handle relationship between article | |
and creators. | |
""" | |
import argparse | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import relationship, backref | |
from sqlalchemy.sql import func | |
from sqlalchemy import Column, UnicodeText, Integer, ForeignKey | |
from sqlalchemy import ForeignKeyConstraint | |
from sqlalchemy import Unicode, DateTime, Date, desc | |
from sqlalchemy import create_engine | |
from sqlalchemy.ext.associationproxy import association_proxy | |
from sqlalchemy.orm import sessionmaker, scoped_session | |
import ConfigParser | |
from nameparser import HumanName | |
from decimal import Decimal | |
import urllib | |
from datetime import datetime | |
from math import ceil | |
import os | |
Base = declarative_base() | |
class Article(Base): | |
"""Article data""" | |
__tablename__ = 'articles' | |
__table_args__ = { 'mysql_engine': 'InnoDB'} | |
#column definitions | |
id = Column(u'id', Integer, primary_key=True, nullable=False) | |
title = Column(u'title', UnicodeText) | |
submission_date = Column(u'submission_date', Date) | |
date = Column('date', Date) | |
document_type = Column(u'document_type', UnicodeText) | |
article_url = Column(u'article_url', UnicodeText) | |
oai_identifier = Column(u'oai_identifier', Unicode(255), unique=True) | |
last_event = Column(u'last_event', UnicodeText) | |
last_event_date = Column(u'last_event_date', Date) | |
status = Column(u'status', Unicode(255)) | |
pdf_url = Column(u'pdf_url', UnicodeText) | |
volume = Column(u'volume', UnicodeText(32)) | |
issue = Column(u'issue', UnicodeText(32)) | |
fpage = Column(u'fpage', UnicodeText(32)) | |
lpage = Column(u'lpage', UnicodeText(32)) | |
#relation definitions | |
# article.article_creators (association rows) | |
article_creators = relationship('ArticlesHaveCreators', | |
cascade="all, delete-orphan", | |
backref="article") | |
# article.creators (convenience) | |
creators = association_proxy('article_creators', 'creator') | |
def __repr__(self): | |
return "<Article('%s, %s')>" % (self.id, | |
self.oai_identifier) | |
class Creator(Base): | |
"""Creators for articles""" | |
__tablename__ = 'creators' | |
__table_args__ = { 'mysql_engine': 'InnoDB'} | |
#column definitions | |
id = Column(u'id', Integer, primary_key=True, nullable=False) | |
last = Column(u'last', UnicodeText) | |
first = Column(u'first', UnicodeText) | |
middle = Column(u'middle', UnicodeText) | |
suffix = Column(u'suffix', UnicodeText) | |
institution = Column(u'institution', Unicode(255)) | |
email = Column(u'email', Unicode(255), unique=True, | |
nullable=True) | |
# creator.creator_articles (access association rows) | |
creator_articles = relationship('ArticlesHaveCreators', | |
cascade="all, delete-orphan", | |
backref="creator") | |
# creator.articles (convenience method) | |
articles = association_proxy('creator_articles', 'article') | |
def __repr__(self): | |
return "u<Creator('%s, %s')>" % (self.id, self.last) | |
class ArticlesHaveCreators(Base): | |
"""Join table for creators and articles""" | |
__tablename__ = 'articles_have_creators' | |
__table_args__ = { 'mysql_engine': 'InnoDB'} | |
article_id = Column(Integer, | |
ForeignKey('articles.id', | |
ondelete='CASCADE', | |
onupdate='CASCADE'), | |
primary_key=True) | |
creator_id = Column(Integer, | |
ForeignKey('creators.id', | |
ondelete='CASCADE', | |
onupdate='CASCADE'), | |
primary_key=True) | |
position = Column(u'position', Integer, primary_key=True, | |
nullable=False, autoincrement=False) | |
# backrefs | |
# articles_have_creators.article (instance method to associated article) | |
# articles_have_creators.creator (instance method to associated creator) | |
def __init__(self, article, creator, position=None): | |
self.article_id = article.id | |
self.creator_id = creator.id | |
self.position = position | |
def __repr__(self): | |
return "<ArticlesHaveCreators('{0}, {1}')>".format(self.article_id, | |
self.creator_id) | |
class Downloads(Base): | |
"""Downloads for articles""" | |
__tablename__ = 'downloads' | |
__table_args__ = { 'mysql_engine': 'InnoDB'} | |
id = Column(u'id', Integer, ForeignKey('articles.id', | |
ondelete='CASCADE', | |
onupdate='CASCADE'), | |
primary_key=True, nullable=False) | |
# use yyyy-mm-01 for the month aggregations | |
# Assume to support by day eventually | |
download_date = Column(u'download_date', Date) | |
download_count = Column(u'download_count', Integer) | |
#relation definitions | |
def __repr__(self): | |
return "<Downloads('{0} {1}')>".format(self.download_date, | |
self.download_count) | |
class Subject(Base): | |
"""Subject class for articles""" | |
__tablename__ = 'subjects' | |
__table_args__ = { 'mysql_engine': 'InnoDB'} | |
#column definitions | |
id = Column(u'id', Integer, ForeignKey('articles.id', | |
onupdate='CASCADE', | |
ondelete='CASCADE'), | |
primary_key=True, nullable=False, | |
autoincrement=False) | |
position = Column(u'position', Integer, primary_key=True, nullable=False, | |
autoincrement=False) | |
subject = Column(u'subject', UnicodeText) | |
def __repr__(self): | |
return "<Subject('%s')>" % (self.subject,) | |
class Pagination(object): | |
"""Pagination object""" | |
def __init__(self, page, per_page, total_count): | |
self.page = page | |
self.per_page = per_page | |
self.total_count = total_count | |
@property | |
def pages(self): | |
"""Return the number of pages""" | |
return int(ceil(self.total_count / float(self.per_page))) | |
@property | |
def has_prev(self): | |
"""Return boolean""" | |
return self.page > 1 | |
@property | |
def has_next(self): | |
"""Return boolean""" | |
return self.page < self.pages | |
def iter_pages(self, left_edge=2, left_current=2, | |
right_current=5, right_edge=2): | |
"""Yield available page numbers""" | |
last = 0 | |
for num in xrange(1, self.pages + 1): | |
if num <= left_edge or \ | |
(num > self.page - left_current - 1 and \ | |
num < self.page + right_current) or \ | |
num > self.pages - right_edge: | |
if last + 1 != num: | |
yield None | |
yield num | |
last = num | |
def make_creator_string(creators): | |
"""Return a joined creator string""" | |
stack = [] | |
for creator in creators: | |
stack.append(unicode(HumanName(creator.creator))) | |
if len(stack) == 2: | |
creator_string = u" and ".join(stack) | |
elif len(stack) == 1: | |
creator_string = stack[0] | |
elif len(stack) > 2: | |
last = stack.pop(-1) | |
creator_string = u", ".join(stack) | |
creator_string = creator_string + u' and ' + last | |
else: | |
creator_string = u"" | |
return creator_string | |
def get_engine(): | |
"""Return an sqlalchemy engine""" | |
config_file = ConfigParser.ConfigParser() | |
directory = os.path.dirname(os.path.realpath(__file__)) | |
config_file.read(os.path.join(directory, 'repository-metrics.cfg')) | |
engine = create_engine(config_file.get("sqlalchemy", "dsn"), | |
echo=config_file.getboolean("sqlalchemy", "echo"), | |
pool_recycle=3600) | |
return engine | |
def get_session(): | |
"""Return a scoped session""" | |
engine = get_engine() | |
session = scoped_session(sessionmaker(autocommit=False, | |
autoflush=False, | |
bind=engine)) | |
return session | |
def test(): | |
"""Small testing framework""" | |
print("Testing...") | |
engine = get_engine() | |
session = get_session(engine) | |
return session | |
def create_tables(): | |
print("Creating tables...") | |
engine = get_engine() | |
Base.metadata.create_all(bind=engine) | |
def drop_tables(): | |
"""Drop the citation tables""" | |
print("Dropping tables...") | |
engine = get_engine() | |
conn = engine.connect() | |
database = conn.begin() | |
tablenames = ['articles', 'creators', 'subjects', 'downloads', | |
'articles_have_creators'] | |
try: | |
conn.execute("SET foreign_key_checks = 0") | |
database.commit() | |
except: | |
database.rollback() | |
for tablename in tablenames: | |
try: | |
conn.execute("drop table {0}".format(tablename)) | |
database.commit() | |
except: | |
database.rollback() | |
database = conn.begin() | |
conn.close() | |
return 1 | |
def main(args): | |
if args.droptables: | |
drop_tables() | |
if args.createtables: | |
create_tables() | |
if args.test: | |
test() | |
def parse_arguments(): | |
"""Command line utilities for model | |
usage: model.py [-h] [-t] [-d] [-c] | |
optional arguments: | |
-h, --help show this help message and exit | |
-t, --test Test session | |
-d, --dropcitations Drop tables | |
-c, --createtables Create tables""" | |
engine = get_engine() | |
parser = argparse.ArgumentParser(description="Command line utility " + | |
"to access model") | |
parser.add_argument("-t", "--test", help="Test session", | |
action="store_true") | |
parser.add_argument("-D", "--droptables", help="Drop tables", | |
action="store_true", ) | |
parser.add_argument("-c", "--createtables", help="Create tables", | |
action="store_true") | |
args = parser.parse_args() | |
return parser.parse_args() | |
if __name__ == "__main__": | |
args = parse_arguments() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment