Created
December 20, 2016 19:05
-
-
Save Melraidin/202fc2097159c677fa21376414567cee to your computer and use it in GitHub Desktop.
Query Fastly data in Luigi tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" | |
ETL for Fastly views. | |
""" | |
import datetime | |
import re | |
import luigi | |
import athena | |
s3_table_data_path = "s3://fastly-logs/date=%s/" | |
class UpdateFastlyPartitions(athena.AthenaQuery): | |
""" | |
Repair Fastly partitions. | |
""" | |
table_name = "fastly" | |
results_path_template = "repair-table-fastly/%s" | |
def run(self): | |
# An alternative approach would be to simply use an | |
# "msck repair table fastly" statement but this is very slow at Athena. | |
client = luigi.s3.S3Client() | |
paths = {} | |
for date in [self.date, self.date - datetime.timedelta(days=1)]: | |
paths[str(date)] = client.list(s3_table_data_path % date) | |
completed = set() | |
for date, paths in paths.iteritems(): | |
for path in paths: | |
full_path = (s3_table_data_path % date) + path | |
matches = re.findall(r"date=([^/]+)/hour=([0-9]+)", full_path) | |
if len(matches) != 1: | |
continue | |
date, hour = matches[0][0], matches[0][1] | |
if (date, hour) in completed: | |
continue | |
shard_path = (s3_table_data_path % date) + ("hour=%s" % hour) | |
try: | |
self.query_store(""" | |
alter table {table} add if not exists partition ( service='drscdn.500px.org', date='{date}', hour={hour} ) location '{path}'; | |
""".format(table=self.table_name, date=date, hour=hour, path=shard_path)) | |
except Exception: | |
pass | |
completed.add((date, hour)) | |
class QueryViewsByLocation(athena.AthenaQuery): | |
""" | |
Query Athena for photo views by location. | |
""" | |
results_path_template = "fastly-views-by-location/%s" | |
def requires(self): | |
return [UpdateFastlyPartitions(date=self.date)] | |
def run(self): | |
self.query_store(""" | |
select date_format(date, '%Y-%m-%d') || ' ' || cast(hour as varchar) || ':00' as date, cast(regexp_extract(path, 'photo/([0-9]+)', 1) as integer) as photo_id, cast(count(*) as integer) as views, round(latitude, 4) as latitude, round(longitude, 4) as longitude from fastly where cast(regexp_extract(path, 'photo/[0-9]+/[^/]*[mhw](?:%%3D|=)([0-9]+)', 1) as bigint) > 600 and response_code in ( 200, 304 ) and date = date '{date}' group by regexp_extract(path, 'photo/([0-9]+)', 1), date, hour, round(latitude, 4), round(longitude, 4) | |
""".format(date=self.date - datetime.timedelta(days=1))) | |
class LoadViewsByLocation(athena.AthenaLoad): | |
""" | |
Load photo views by location. | |
""" | |
table = "fastly_views_by_location" | |
def requires(self): | |
return QueryViewsByLocation(date=self.date) | |
def create_table(self, connection): | |
connection.cursor().execute(""" | |
create table {table} ( | |
date datetime encode lzo, | |
photo_id int encode lzo, | |
views int encode lzo, | |
latitude decimal(6, 3) encode lzo, | |
longitude decimal(6, 3) encode lzo | |
); | |
""".format(table=self.table)) | |
if __name__ == '__main__': | |
luigi.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment