Last active
July 26, 2018 02:20
-
-
Save shiro01/18c8800b6fd945179eb2bc15e98701be to your computer and use it in GitHub Desktop.
当日のAthenaパーティション設定クエリを作成し実行する。CloudWatchイベントを使用して定期実行する。
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 使用するIAMロールに設定するポリシー | |
# AmazonAthenaFullAccess | |
# AmazonS3ReadOnlyAccess もしかしたらいらないかも | |
# AWSLambdaExecute | |
# | |
# 環境変数 | |
# RESULT_OUTPUT_S3_DIR 例:s3://aws-athena-query-results-**************-ap-northeast-1/ | |
# TARGET_DB 例:test_db | |
# TARGET_TABLE 例:test_table | |
# TARGET_LOCATION 例:s3://bucket-name/dir_1/dir_2 | |
# RETRY_COUNT 例:20 | |
import boto3 | |
import os | |
import datetime | |
import time | |
# S3のファイル一覧からAthenaのパーティション設定用Query作成し実行する | |
def lambda_handler(event, context): | |
target_date = build_target_date() | |
athena_query = build_athena_add_partition_query(target_date) | |
query_response = athena_query_execution(athena_query) | |
athena_query_result(query_response['QueryExecutionId']) | |
# 当日日付から日付辞書リスト作成 | |
def build_target_date(): | |
today = datetime.date.today() | |
target_date = { | |
"year":today.strftime("%Y"), | |
"month":today.strftime("%m"), | |
"day":today.strftime("%d") | |
} | |
return target_date | |
# Athenaのパーティション設定用クエリ作成 | |
def build_athena_add_partition_query(target_date): | |
# Athenaで作成した対象のテーブル | |
table = os.environ['TARGET_TABLE'] | |
# パーティションを設定するS3フォルダの接頭詞 例:s3://test-target-bucket/test-dir/ | |
location_prefix = os.environ['TARGET_LOCATION'] | |
# 「IF NOT EXISTS」で存在しない場合のみパーティションを追加するクエリにしている | |
query_form = "ALTER TABLE {table} " \ | |
+ "ADD IF NOT EXISTS PARTITION (year='{year}', month='{month}', day='{day}') "\ | |
+ "location '{location_prefix}/{year}/{month}/{day}/';" | |
query = query_form.format( | |
table = table, | |
year = target_date["year"], | |
month = target_date["month"], | |
day = target_date["day"], | |
location_prefix = location_prefix | |
) | |
return query | |
# Athenaへクエリ実行 | |
def athena_query_execution(query): | |
print("QUERY : " + query) | |
DB = os.environ['TARGET_DB'] | |
RESULT_OUTPUT_S3_DIR = os.environ['RESULT_OUTPUT_S3_DIR'] | |
client = boto3.client('athena') | |
query_response = client.start_query_execution( | |
QueryString=query, | |
QueryExecutionContext={ | |
'Database': DB | |
}, | |
ResultConfiguration={ | |
'OutputLocation': RESULT_OUTPUT_S3_DIR, | |
} | |
) | |
return query_response | |
# Athenaへのクエリ実行状態確認 | |
def athena_query_result(query_execution_id): | |
RETRY_COUNT = int(os.environ['RETRY_COUNT']) | |
client = boto3.client('athena') | |
# ステータス確認 | |
for i in range(0, RETRY_COUNT): | |
query_status = client.get_query_execution(QueryExecutionId=query_execution_id) | |
#print(query_status) | |
query_execution_status = query_status['QueryExecution']['Status']['State'] | |
print("STATUS:" + query_execution_status) | |
if query_execution_status == 'SUCCEEDED': | |
break | |
if query_execution_status == 'FAILED': | |
# raise Exception('FAILED') | |
break | |
else: | |
time.sleep(1) | |
else: | |
# タイムアウトした場合Queryを止める | |
client.stop_query_execution(QueryExecutionId=query_execution_id) | |
# raise Exception('TIME OUT') | |
print('TIME OUT') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment