Skip to content

Instantly share code, notes, and snippets.

@sn-0w
Created October 6, 2021 18:31
Show Gist options
  • Select an option

  • Save sn-0w/2f83f50eddf5946582f3326e07644f76 to your computer and use it in GitHub Desktop.

Select an option

Save sn-0w/2f83f50eddf5946582f3326e07644f76 to your computer and use it in GitHub Desktop.
import sys
import datetime
import dateutil
import dateutil.parser as dateutil_parser
import logging
import time
import sqlalchemy as sa
import argparse
import csv
import urllib2
import json
import hashlib
from urlparse import urlparse
from boto.s3.connection import S3Connection
from StringIO import StringIO
from gzip import GzipFile
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Float, Integer, BigInteger, Text, DateTime, Boolean
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import *
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--input-s3', '-s',
help='Read data from the newest file in the given bucket with the given prefix')
parser.add_argument('--database-url', '-d', default='sqlite:///vba_score.db',
help='Database in which to store the data')
parser.add_argument('--aws-access-key', default=None,
help='Access key for Amazon Web Services')
parser.add_argument('--aws-secret-key', default=None,
help='Secret key for Amazon Web Services')
return parser.parse_args()
Base = declarative_base()
class Version(Base):
__tablename__ = 'versions'
id = Column(Integer, primary_key=True)
time = Column(DateTime)
hash = Column(Text)
vba_score = Table('vba_score', Base.metadata,
Column('version', Integer),
Column('count', BigInteger),
Column('category', Text),
Column('dscore', BigInteger),
Column('flock_score', BigInteger),
Column('dscore_str', Text),
Column('client_asn_id', BigInteger),
Column('time', Float))
Index(vba_score.name + '_index',
vba_score.c.client_asn_id,
vba_score.c.version,
unique=True)
def get_db_session(database_url):
con = sa.create_engine(database_url)
Base.metadata.create_all(con)
Session = sessionmaker(bind=con)
return Session()
def get_latest_version_info(db):
v = db.query(Version).order_by(sa.desc(Version.id.name)).limit(1).one_or_none()
if v == None:
new_version = 0
latest_timestamp = None
else:
latest_version, latest_timestamp = v.id, v.time.replace(tzinfo=dateutil.tz.gettz('UTC'))
new_version = latest_version + 1 if latest_version != None else 0
return new_version, latest_timestamp
def get_s3_bucket(bucket_name, aws_access_key, aws_secret_key):
conn = S3Connection(aws_access_key, aws_secret_key)
# pass validate=False to avoid checking the bucket's existence, which requires more S3 permissions than strictly necessary for us here.
return conn.get_bucket(bucket_name, validate=False)
def get_available_versions(bucket, path_prefix):
versions = bucket.list(prefix=path_prefix)
dates_and_versions = [
(dateutil_parser.parse(version.last_modified), version)
for version in versions]
dates_and_versions.sort(key=lambda (date, _version): date)
return dates_and_versions
def parse_s3_url(url_str):
url = urlparse(url_str)
if url.scheme != 's3':
raise ValueError('bad S3 URL scheme: {}'.format(url.scheme))
path_prefix = url.path
if len(path_prefix) > 0 and path_prefix[0] == '/':
path_prefix = path_prefix[1:]
return url.hostname, path_prefix
def update_versions(db, dates_and_versions):
new_version_number, latest_timestamp = get_latest_version_info(db)
for date, version in dates_and_versions:
if latest_timestamp is not None and date <= latest_timestamp: # this could be a binary search
continue
db.add(Version(id=new_version_number, time=date, hash=None))
new_version_number += 1
def ensure_version_cached(db, version_number, version):
version_record = db.query(Version).filter(Version.id == version_number).first()
if version_record is None:
# Can't happen. We don't get called if the version wasn't seen on
# S3, and we should just have loaded all versions from S3 that weren't
# in the DB already added the version in update_versions()
logging.error("version %d is to be cached, but we don't know about it", version_number)
return
if version_record.hash is not None:
# already taken care of!
return
logging.info("fetching scores for active version %d", version_number)
f = StringIO()
version.get_file(fp=f)
f.seek(0)
if version.name[-3:] == '.gz':
f = GzipFile(fileobj=f, mode="r")
for row in csv.DictReader(f):
row["version"] = version_number
del row["client_asn"]
# insert each row individually for now, on gvc staging i got:
# CompileError: The 'sqlite' dialect with current database version settings does not support in-place multirow inserts.
# ... and even locally it chokes eventually when there's too many values in one insert.
db.execute(vba_score.insert().values(row))
f.seek(0)
version_record.hash = "sha1:" + hashlib.sha1(f.read()).hexdigest()
logging.info("inserted scores from %s / %s", version.name, version_record.hash)
def get_active_version_numbers():
r = urllib2.urlopen("http://usher.justin.tv/dboption/all/gvc.iprep.*.json")
try:
opts = json.loads(r.read())
finally:
r.close()
if opts[0] is not None:
opts = opts[0]
version_numbers = [opts["gvc.iprep.v1"], opts["gvc.iprep.v2"]]
return filter(lambda v: v is not None and v >= 0, version_numbers)
def run_update():
args = parse_args()
db = get_db_session(args.database_url)
bucket_name, path_prefix = parse_s3_url(args.input_s3)
bucket = get_s3_bucket(bucket_name, args.aws_access_key,
args.aws_secret_key)
dates_and_versions = get_available_versions(bucket, path_prefix)
logging.info("found %d versions", len(dates_and_versions))
if len(dates_and_versions) == 0:
# nothing to do here, can't even possibly cache anything
return
update_versions(db, dates_and_versions)
db.commit()
for version_number in get_active_version_numbers():
date_and_version = dates_and_versions[version_number]
if date_and_version is None:
logging.error("active version %d not found", version_number)
continue
_date, version = date_and_version
ensure_version_cached(db, version_number, version)
db.commit()
if __name__ == '__main__':
logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
logging.Formatter.converter = time.gmtime
# logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
logging.info("starting import")
try:
run_update()
logging.info("success")
except Exception:
logging.error("unhandled exception", exc_info=True)
sys.exit(-1)
finally:
logging.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment