Last active
December 30, 2021 10:55
-
-
Save ohsawa0515/06019739eb9cdc4d6d3813c43fbd1e29 to your computer and use it in GitHub Desktop.
Lambda function to upload Amazon Athena's history to S3.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding:utf-8 | |
from __future__ import print_function | |
import os, boto3, datetime, csv | |
s3 = boto3.resource('s3') | |
athena_client = boto3.client('athena') | |
today = datetime.datetime.today().strftime("%Y%m%d") | |
s3_bucket = os.environ.get('S3_BUCKET') | |
max_items = 50 | |
header = ['QueryExecutionId', 'Query', 'OutputLocation', 'Database', 'State', 'StateChangeReason', 'SubmissionDateTime', 'CompletionDateTime', 'EngineExecutionTimeInMillis', 'DataScannedInBytes'] | |
csv_file = "/tmp/athena_history_%s.csv" % today | |
s3_path = "athena_history_%s.csv" % today | |
def get_athena_history(max_items, next_token=''): | |
if next_token != '': | |
result = athena_client.list_query_executions( | |
MaxResults=max_items, | |
NextToken=next_token | |
) | |
else: | |
result = athena_client.list_query_executions( | |
MaxResults=max_items, | |
) | |
execution_ids = result['QueryExecutionIds'] | |
next_token = result['NextToken'] | |
history = [] | |
for execution_id in execution_ids: | |
exe = [] | |
result = athena_client.get_query_execution( | |
QueryExecutionId=execution_id | |
) | |
query_execution = result.get('QueryExecution') | |
if query_execution is None: | |
continue | |
status = query_execution.get('Status') | |
if status is None: | |
continue | |
# Acquire only one day's History | |
submission_date_time = status.get('SubmissionDateTime') | |
if submission_date_time is None: | |
continue | |
if today != submission_date_time.strftime("%Y%m%d"): | |
continue | |
exe.append(query_execution.get('QueryExecutionId', '')) | |
exe.append(query_execution.get('Query', '').replace('\n', '')) | |
result_configuration = query_execution.get('ResultConfiguration') | |
if result_configuration is not None: | |
exe.append(result_configuration.get('OutputLocation', '')) | |
else: | |
exe.append('') | |
query_execution_context = query_execution.get('QueryExecutionContext') | |
if query_execution_context is not None: | |
exe.append(query_execution_context.get('Database', '')) | |
else: | |
exe.append('') | |
exe.append(status.get('State', '')) | |
exe.append(status.get('StateChangeReason', '')) | |
exe.append(submission_date_time) | |
exe.append(status.get('CompletionDateTime', '')) | |
statistics = query_execution.get('Statistics') | |
if statistics is not None: | |
exe.append(statistics.get('EngineExecutionTimeInMillis', '')) | |
exe.append(statistics.get('DataScannedInBytes', '')) | |
else: | |
exe.append('') | |
exe.append('') | |
history.append(exe) | |
return history, next_token | |
def lambda_handler(event, context): | |
history = [] | |
history.append(header) | |
result, next_token = get_athena_history(max_items) | |
while True: | |
history.extend(result) | |
if len(result) < 50: | |
break | |
# If it seems that more than 50 cases are likely to get the next 50 cases | |
if len(result) == 50: | |
result, next_token = get_athena_history(max_items, next_token) | |
# Write to CSV file | |
with open(csv_file, 'w') as f: | |
writer = csv.writer(f, lineterminator='\n') | |
writer.writerows(history) | |
# Upload to S3 | |
s3.Bucket(s3_bucket).upload_file(csv_file, s3_path) | |
return None | |
# debug | |
if __name__ == '__main__': | |
event = '' | |
context = '' | |
lambda_handler(event, context) |
Required IAM policy
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"athena:GetQueryExecution",
"athena:ListQueryExecutions",
"s3:PutObject"
],
"Effect": "Allow",
"Resource": "*"
}
]
}
I suggest in Query replace line breaks with space and not empty string.
exe.append(query_execution.get('Query', '').replace('\n', ' '))
Otherwise
select id, title
from articles
will become select id, titlefrom articles
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Logging Amazon Athena History
Output example
Output: /tmp/athena_history_YYYYMMDD.csv
Execute at local machine
$ S3_BUCKET='<S3 Bucket>' python logging_athena_history.py