Skip to content

Instantly share code, notes, and snippets.

@ohsawa0515
Last active December 30, 2021 10:55
Show Gist options
  • Save ohsawa0515/06019739eb9cdc4d6d3813c43fbd1e29 to your computer and use it in GitHub Desktop.
Save ohsawa0515/06019739eb9cdc4d6d3813c43fbd1e29 to your computer and use it in GitHub Desktop.
Lambda function to upload Amazon Athena's history to S3.
# coding:utf-8
from __future__ import print_function
import os, boto3, datetime, csv
s3 = boto3.resource('s3')
athena_client = boto3.client('athena')
today = datetime.datetime.today().strftime("%Y%m%d")
s3_bucket = os.environ.get('S3_BUCKET')
max_items = 50
header = ['QueryExecutionId', 'Query', 'OutputLocation', 'Database', 'State', 'StateChangeReason', 'SubmissionDateTime', 'CompletionDateTime', 'EngineExecutionTimeInMillis', 'DataScannedInBytes']
csv_file = "/tmp/athena_history_%s.csv" % today
s3_path = "athena_history_%s.csv" % today
def get_athena_history(max_items, next_token=''):
if next_token != '':
result = athena_client.list_query_executions(
MaxResults=max_items,
NextToken=next_token
)
else:
result = athena_client.list_query_executions(
MaxResults=max_items,
)
execution_ids = result['QueryExecutionIds']
next_token = result['NextToken']
history = []
for execution_id in execution_ids:
exe = []
result = athena_client.get_query_execution(
QueryExecutionId=execution_id
)
query_execution = result.get('QueryExecution')
if query_execution is None:
continue
status = query_execution.get('Status')
if status is None:
continue
# Acquire only one day's History
submission_date_time = status.get('SubmissionDateTime')
if submission_date_time is None:
continue
if today != submission_date_time.strftime("%Y%m%d"):
continue
exe.append(query_execution.get('QueryExecutionId', ''))
exe.append(query_execution.get('Query', '').replace('\n', ''))
result_configuration = query_execution.get('ResultConfiguration')
if result_configuration is not None:
exe.append(result_configuration.get('OutputLocation', ''))
else:
exe.append('')
query_execution_context = query_execution.get('QueryExecutionContext')
if query_execution_context is not None:
exe.append(query_execution_context.get('Database', ''))
else:
exe.append('')
exe.append(status.get('State', ''))
exe.append(status.get('StateChangeReason', ''))
exe.append(submission_date_time)
exe.append(status.get('CompletionDateTime', ''))
statistics = query_execution.get('Statistics')
if statistics is not None:
exe.append(statistics.get('EngineExecutionTimeInMillis', ''))
exe.append(statistics.get('DataScannedInBytes', ''))
else:
exe.append('')
exe.append('')
history.append(exe)
return history, next_token
def lambda_handler(event, context):
history = []
history.append(header)
result, next_token = get_athena_history(max_items)
while True:
history.extend(result)
if len(result) < 50:
break
# If it seems that more than 50 cases are likely to get the next 50 cases
if len(result) == 50:
result, next_token = get_athena_history(max_items, next_token)
# Write to CSV file
with open(csv_file, 'w') as f:
writer = csv.writer(f, lineterminator='\n')
writer.writerows(history)
# Upload to S3
s3.Bucket(s3_bucket).upload_file(csv_file, s3_path)
return None
# debug
if __name__ == '__main__':
event = ''
context = ''
lambda_handler(event, context)
@kapcod
Copy link

kapcod commented Dec 30, 2021

I suggest in Query replace line breaks with space and not empty string.

        exe.append(query_execution.get('Query', '').replace('\n', ' '))

Otherwise

select id, title
from articles

will become select id, titlefrom articles

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment