Created
October 6, 2021 18:31
-
-
Save sn-0w/2f83f50eddf5946582f3326e07644f76 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys | |
| import datetime | |
| import dateutil | |
| import dateutil.parser as dateutil_parser | |
| import logging | |
| import time | |
| import sqlalchemy as sa | |
| import argparse | |
| import csv | |
| import urllib2 | |
| import json | |
| import hashlib | |
| from urlparse import urlparse | |
| from boto.s3.connection import S3Connection | |
| from StringIO import StringIO | |
| from gzip import GzipFile | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy import Column, Float, Integer, BigInteger, Text, DateTime, Boolean | |
| from sqlalchemy.orm import sessionmaker | |
| from sqlalchemy.schema import * | |
| def parse_args(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--input-s3', '-s', | |
| help='Read data from the newest file in the given bucket with the given prefix') | |
| parser.add_argument('--database-url', '-d', default='sqlite:///vba_score.db', | |
| help='Database in which to store the data') | |
| parser.add_argument('--aws-access-key', default=None, | |
| help='Access key for Amazon Web Services') | |
| parser.add_argument('--aws-secret-key', default=None, | |
| help='Secret key for Amazon Web Services') | |
| return parser.parse_args() | |
| Base = declarative_base() | |
| class Version(Base): | |
| __tablename__ = 'versions' | |
| id = Column(Integer, primary_key=True) | |
| time = Column(DateTime) | |
| hash = Column(Text) | |
| vba_score = Table('vba_score', Base.metadata, | |
| Column('version', Integer), | |
| Column('count', BigInteger), | |
| Column('category', Text), | |
| Column('dscore', BigInteger), | |
| Column('flock_score', BigInteger), | |
| Column('dscore_str', Text), | |
| Column('client_asn_id', BigInteger), | |
| Column('time', Float)) | |
| Index(vba_score.name + '_index', | |
| vba_score.c.client_asn_id, | |
| vba_score.c.version, | |
| unique=True) | |
| def get_db_session(database_url): | |
| con = sa.create_engine(database_url) | |
| Base.metadata.create_all(con) | |
| Session = sessionmaker(bind=con) | |
| return Session() | |
| def get_latest_version_info(db): | |
| v = db.query(Version).order_by(sa.desc(Version.id.name)).limit(1).one_or_none() | |
| if v == None: | |
| new_version = 0 | |
| latest_timestamp = None | |
| else: | |
| latest_version, latest_timestamp = v.id, v.time.replace(tzinfo=dateutil.tz.gettz('UTC')) | |
| new_version = latest_version + 1 if latest_version != None else 0 | |
| return new_version, latest_timestamp | |
| def get_s3_bucket(bucket_name, aws_access_key, aws_secret_key): | |
| conn = S3Connection(aws_access_key, aws_secret_key) | |
| # pass validate=False to avoid checking the bucket's existence, which requires more S3 permissions than strictly necessary for us here. | |
| return conn.get_bucket(bucket_name, validate=False) | |
| def get_available_versions(bucket, path_prefix): | |
| versions = bucket.list(prefix=path_prefix) | |
| dates_and_versions = [ | |
| (dateutil_parser.parse(version.last_modified), version) | |
| for version in versions] | |
| dates_and_versions.sort(key=lambda (date, _version): date) | |
| return dates_and_versions | |
| def parse_s3_url(url_str): | |
| url = urlparse(url_str) | |
| if url.scheme != 's3': | |
| raise ValueError('bad S3 URL scheme: {}'.format(url.scheme)) | |
| path_prefix = url.path | |
| if len(path_prefix) > 0 and path_prefix[0] == '/': | |
| path_prefix = path_prefix[1:] | |
| return url.hostname, path_prefix | |
| def update_versions(db, dates_and_versions): | |
| new_version_number, latest_timestamp = get_latest_version_info(db) | |
| for date, version in dates_and_versions: | |
| if latest_timestamp is not None and date <= latest_timestamp: # this could be a binary search | |
| continue | |
| db.add(Version(id=new_version_number, time=date, hash=None)) | |
| new_version_number += 1 | |
| def ensure_version_cached(db, version_number, version): | |
| version_record = db.query(Version).filter(Version.id == version_number).first() | |
| if version_record is None: | |
| # Can't happen. We don't get called if the version wasn't seen on | |
| # S3, and we should just have loaded all versions from S3 that weren't | |
| # in the DB already added the version in update_versions() | |
| logging.error("version %d is to be cached, but we don't know about it", version_number) | |
| return | |
| if version_record.hash is not None: | |
| # already taken care of! | |
| return | |
| logging.info("fetching scores for active version %d", version_number) | |
| f = StringIO() | |
| version.get_file(fp=f) | |
| f.seek(0) | |
| if version.name[-3:] == '.gz': | |
| f = GzipFile(fileobj=f, mode="r") | |
| for row in csv.DictReader(f): | |
| row["version"] = version_number | |
| del row["client_asn"] | |
| # insert each row individually for now, on gvc staging i got: | |
| # CompileError: The 'sqlite' dialect with current database version settings does not support in-place multirow inserts. | |
| # ... and even locally it chokes eventually when there's too many values in one insert. | |
| db.execute(vba_score.insert().values(row)) | |
| f.seek(0) | |
| version_record.hash = "sha1:" + hashlib.sha1(f.read()).hexdigest() | |
| logging.info("inserted scores from %s / %s", version.name, version_record.hash) | |
| def get_active_version_numbers(): | |
| r = urllib2.urlopen("http://usher.justin.tv/dboption/all/gvc.iprep.*.json") | |
| try: | |
| opts = json.loads(r.read()) | |
| finally: | |
| r.close() | |
| if opts[0] is not None: | |
| opts = opts[0] | |
| version_numbers = [opts["gvc.iprep.v1"], opts["gvc.iprep.v2"]] | |
| return filter(lambda v: v is not None and v >= 0, version_numbers) | |
| def run_update(): | |
| args = parse_args() | |
| db = get_db_session(args.database_url) | |
| bucket_name, path_prefix = parse_s3_url(args.input_s3) | |
| bucket = get_s3_bucket(bucket_name, args.aws_access_key, | |
| args.aws_secret_key) | |
| dates_and_versions = get_available_versions(bucket, path_prefix) | |
| logging.info("found %d versions", len(dates_and_versions)) | |
| if len(dates_and_versions) == 0: | |
| # nothing to do here, can't even possibly cache anything | |
| return | |
| update_versions(db, dates_and_versions) | |
| db.commit() | |
| for version_number in get_active_version_numbers(): | |
| date_and_version = dates_and_versions[version_number] | |
| if date_and_version is None: | |
| logging.error("active version %d not found", version_number) | |
| continue | |
| _date, version = date_and_version | |
| ensure_version_cached(db, version_number, version) | |
| db.commit() | |
| if __name__ == '__main__': | |
| logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) | |
| logging.Formatter.converter = time.gmtime | |
| # logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) | |
| logging.info("starting import") | |
| try: | |
| run_update() | |
| logging.info("success") | |
| except Exception: | |
| logging.error("unhandled exception", exc_info=True) | |
| sys.exit(-1) | |
| finally: | |
| logging.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment