Created
August 16, 2022 16:05
-
-
Save kapcod/128699f4e80df44e26eaec57fb0366a9 to your computer and use it in GitHub Desktop.
Exports Athena history in hours json.gz files on S3 for storage and analysis. Can then be analyzed using Athena itself.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import gzip | |
import json | |
import time | |
import re | |
def export(workgroup, region, hours=1): | |
athena = boto3.client('athena', region_name=region) | |
current_hour_hist = [] | |
next_token = None | |
now_ts = time.time() | |
top_ts = cur_hour_ts = int(now_ts / 3600) * 3600 | |
bottom_ts = cur_hour_ts - hours * 3600 | |
ts = None | |
scanned = 0 | |
camel_case_rx = re.compile(r'(?<!^)(?=[A-Z])') | |
while not ts or ts >= bottom_ts: | |
params = {"WorkGroup": workgroup} | |
if next_token: params['NextToken'] = next_token | |
res = athena.list_query_executions(**params) | |
next_token = res['NextToken'] | |
data = athena.batch_get_query_execution(QueryExecutionIds=res['QueryExecutionIds'])['QueryExecutions'] | |
data = [d for d in data if d['Status'].get('CompletionDateTime')] | |
sort_by_time = lambda x: x['Status']['CompletionDateTime'].timestamp() | |
data.sort(key=sort_by_time, reverse=True) | |
for d in data: | |
scanned += 1 | |
for f in ['Status', 'Statistics', 'ResultConfiguration', 'QueryExecutionContext']: | |
d.update(d.pop(f)) | |
ts = d['CompletionDateTime'].timestamp() | |
if ts < cur_hour_ts: | |
save(workgroup, cur_hour_ts, current_hour_hist, region) | |
current_hour_hist = [] | |
cur_hour_ts -= 3600 | |
if ts < bottom_ts: break | |
if ts >= top_ts: continue | |
for f in ['SubmissionDateTime', 'CompletionDateTime']: | |
d[f] = d[f].strftime('%Y-%m-%d %H:%M:%S') | |
for f in list(d.keys()): | |
d[camel_case_rx.sub('_', f).lower()] = d.pop(f) | |
current_hour_hist.append(json.dumps(d)) | |
print('\rScanned: {}, time: {}'.format(scanned, time.strftime('%Y-%m-%d %H:%M', time.gmtime(ts)))) | |
def save(workgroup, cur_hour_ts, hist, region): | |
if not hist: return | |
s3 = boto3.client('s3', region_name=region) | |
path = time.strftime("athena_history/date=%Y-%m-%d/" + workgroup + "-%H.json.gz", time.gmtime(cur_hour_ts)) | |
data = gzip.compress(bytes('\n'.join(hist), 'utf8')) | |
s3.put_object(Bucket='seekingalpha-data-dev', Key=path, Body=data) | |
print('Saved: {}: {}'.format(path, len(hist))) | |
def export_all(region, hours=1): | |
athena = boto3.client('athena', region_name=region) | |
for workgroup in athena.list_work_groups()['WorkGroups']: | |
export(workgroup, region, hours) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment