Skip to content

Instantly share code, notes, and snippets.

@ohsawa0515
Last active December 30, 2021 10:55
Show Gist options
  • Save ohsawa0515/06019739eb9cdc4d6d3813c43fbd1e29 to your computer and use it in GitHub Desktop.
Save ohsawa0515/06019739eb9cdc4d6d3813c43fbd1e29 to your computer and use it in GitHub Desktop.
Lambda function to upload Amazon Athena's history to S3.
# coding:utf-8
from __future__ import print_function
import os, boto3, datetime, csv
s3 = boto3.resource('s3')
athena_client = boto3.client('athena')
today = datetime.datetime.today().strftime("%Y%m%d")
s3_bucket = os.environ.get('S3_BUCKET')
max_items = 50
header = ['QueryExecutionId', 'Query', 'OutputLocation', 'Database', 'State', 'StateChangeReason', 'SubmissionDateTime', 'CompletionDateTime', 'EngineExecutionTimeInMillis', 'DataScannedInBytes']
csv_file = "/tmp/athena_history_%s.csv" % today
s3_path = "athena_history_%s.csv" % today
def get_athena_history(max_items, next_token=''):
if next_token != '':
result = athena_client.list_query_executions(
MaxResults=max_items,
NextToken=next_token
)
else:
result = athena_client.list_query_executions(
MaxResults=max_items,
)
execution_ids = result['QueryExecutionIds']
next_token = result['NextToken']
history = []
for execution_id in execution_ids:
exe = []
result = athena_client.get_query_execution(
QueryExecutionId=execution_id
)
query_execution = result.get('QueryExecution')
if query_execution is None:
continue
status = query_execution.get('Status')
if status is None:
continue
# Acquire only one day's History
submission_date_time = status.get('SubmissionDateTime')
if submission_date_time is None:
continue
if today != submission_date_time.strftime("%Y%m%d"):
continue
exe.append(query_execution.get('QueryExecutionId', ''))
exe.append(query_execution.get('Query', '').replace('\n', ''))
result_configuration = query_execution.get('ResultConfiguration')
if result_configuration is not None:
exe.append(result_configuration.get('OutputLocation', ''))
else:
exe.append('')
query_execution_context = query_execution.get('QueryExecutionContext')
if query_execution_context is not None:
exe.append(query_execution_context.get('Database', ''))
else:
exe.append('')
exe.append(status.get('State', ''))
exe.append(status.get('StateChangeReason', ''))
exe.append(submission_date_time)
exe.append(status.get('CompletionDateTime', ''))
statistics = query_execution.get('Statistics')
if statistics is not None:
exe.append(statistics.get('EngineExecutionTimeInMillis', ''))
exe.append(statistics.get('DataScannedInBytes', ''))
else:
exe.append('')
exe.append('')
history.append(exe)
return history, next_token
def lambda_handler(event, context):
history = []
history.append(header)
result, next_token = get_athena_history(max_items)
while True:
history.extend(result)
if len(result) < 50:
break
# If it seems that more than 50 cases are likely to get the next 50 cases
if len(result) == 50:
result, next_token = get_athena_history(max_items, next_token)
# Write to CSV file
with open(csv_file, 'w') as f:
writer = csv.writer(f, lineterminator='\n')
writer.writerows(history)
# Upload to S3
s3.Bucket(s3_bucket).upload_file(csv_file, s3_path)
return None
# debug
if __name__ == '__main__':
event = ''
context = ''
lambda_handler(event, context)
@ohsawa0515
Copy link
Author

ohsawa0515 commented Sep 3, 2018

Logging Amazon Athena History

Output example

Output: /tmp/athena_history_YYYYMMDD.csv

QueryExecutionId,Query,OutputLocation,Database,State,StateChangeReason,SubmissionDateTime,CompletionDateTime,EngineExecutionTimeInMillis,DataScannedInBytes
12121212-3434-5656-7878-abababababab,"SELECT * FROM ""foo"".""bar"" limit 10",s3://aws-athena-query-results-xxxxxxx/Unsaved/2018/09/03/12121212-3434-5656-7878-abababababab.csv,foo,SUCCEEDED,,2018-09-03 18:13:37.872000+09:00,2018-09-03 18:13:38.433000+09:00,341,60435
23232323-4545-6767-8989-cdcdcdcdcdcd,"SELECT * FROM ""foo"".""baz"" limit 10",s3://aws-athena-query-results-xxxxxxx/Unsaved/2018/09/03/23232323-4545-6767-8989-cdcdcdcdcdcd.csv,foo,SUCCEEDED,,2018-09-03 18:13:36.250000+09:00,2018-09-03 18:13:36.781000+09:00,349,62145
abababab-cdcd-efef-ghgh-ijijijijijij,"select * from hogehoge by id asclimit 1,s3://aws-athena-query-results-xxxxxxx/Unsaved/2018/09/03/abababab-cdcd-efef-ghgh-ijijijijijij.csv,hogehoge,FAILED,SYNTAX_ERROR: line 5:10: Column 'xxx' cannot be resolved,2018-09-03 17:53:18.777000+09:00,2018-09-03 17:53:19.740000+09:00,607,0

Execute at local machine

$ S3_BUCKET='<S3 Bucket>' python logging_athena_history.py

@ohsawa0515
Copy link
Author

ohsawa0515 commented Sep 23, 2018

Required IAM policy

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "athena:GetQueryExecution",
        "athena:ListQueryExecutions",
        "s3:PutObject"
      ],
      "Effect": "Allow",
      "Resource": "*"
    }
  ]
}

@kapcod
Copy link

kapcod commented Dec 30, 2021

I suggest in Query replace line breaks with space and not empty string.

        exe.append(query_execution.get('Query', '').replace('\n', ' '))

Otherwise

select id, title
from articles

will become select id, titlefrom articles

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment