Skip to content

Instantly share code, notes, and snippets.

@shiro01
Last active July 26, 2018 02:20
Show Gist options
  • Save shiro01/18c8800b6fd945179eb2bc15e98701be to your computer and use it in GitHub Desktop.
Save shiro01/18c8800b6fd945179eb2bc15e98701be to your computer and use it in GitHub Desktop.
当日のAthenaパーティション設定クエリを作成し実行する。CloudWatchイベントを使用して定期実行する。
# 使用するIAMロールに設定するポリシー
# AmazonAthenaFullAccess
# AmazonS3ReadOnlyAccess もしかしたらいらないかも
# AWSLambdaExecute
#
# 環境変数
# RESULT_OUTPUT_S3_DIR 例:s3://aws-athena-query-results-**************-ap-northeast-1/
# TARGET_DB 例:test_db
# TARGET_TABLE 例:test_table
# TARGET_LOCATION 例:s3://bucket-name/dir_1/dir_2
# RETRY_COUNT 例:20
import boto3
import os
import datetime
import time
# S3のファイル一覧からAthenaのパーティション設定用Query作成し実行する
def lambda_handler(event, context):
target_date = build_target_date()
athena_query = build_athena_add_partition_query(target_date)
query_response = athena_query_execution(athena_query)
athena_query_result(query_response['QueryExecutionId'])
# 当日日付から日付辞書リスト作成
def build_target_date():
today = datetime.date.today()
target_date = {
"year":today.strftime("%Y"),
"month":today.strftime("%m"),
"day":today.strftime("%d")
}
return target_date
# Athenaのパーティション設定用クエリ作成
def build_athena_add_partition_query(target_date):
# Athenaで作成した対象のテーブル
table = os.environ['TARGET_TABLE']
# パーティションを設定するS3フォルダの接頭詞 例:s3://test-target-bucket/test-dir/
location_prefix = os.environ['TARGET_LOCATION']
# 「IF NOT EXISTS」で存在しない場合のみパーティションを追加するクエリにしている
query_form = "ALTER TABLE {table} " \
+ "ADD IF NOT EXISTS PARTITION (year='{year}', month='{month}', day='{day}') "\
+ "location '{location_prefix}/{year}/{month}/{day}/';"
query = query_form.format(
table = table,
year = target_date["year"],
month = target_date["month"],
day = target_date["day"],
location_prefix = location_prefix
)
return query
# Athenaへクエリ実行
def athena_query_execution(query):
print("QUERY : " + query)
DB = os.environ['TARGET_DB']
RESULT_OUTPUT_S3_DIR = os.environ['RESULT_OUTPUT_S3_DIR']
client = boto3.client('athena')
query_response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DB
},
ResultConfiguration={
'OutputLocation': RESULT_OUTPUT_S3_DIR,
}
)
return query_response
# Athenaへのクエリ実行状態確認
def athena_query_result(query_execution_id):
RETRY_COUNT = int(os.environ['RETRY_COUNT'])
client = boto3.client('athena')
# ステータス確認
for i in range(0, RETRY_COUNT):
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
#print(query_status)
query_execution_status = query_status['QueryExecution']['Status']['State']
print("STATUS:" + query_execution_status)
if query_execution_status == 'SUCCEEDED':
break
if query_execution_status == 'FAILED':
# raise Exception('FAILED')
break
else:
time.sleep(1)
else:
# タイムアウトした場合Queryを止める
client.stop_query_execution(QueryExecutionId=query_execution_id)
# raise Exception('TIME OUT')
print('TIME OUT')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment