Skip to content

Instantly share code, notes, and snippets.

@shiro01
Last active July 24, 2018 04:13
Show Gist options
  • Select an option

  • Save shiro01/3d355bb66d94601b5425e84cae03a355 to your computer and use it in GitHub Desktop.

Select an option

Save shiro01/3d355bb66d94601b5425e84cae03a355 to your computer and use it in GitHub Desktop.
LambdaでS3のファイルパスからAthena用パーティション設定Queryを作成し実行する
# 使用するIAMロールに設定するポリシー
# AmazonAthenaFullAccess
# AmazonS3ReadOnlyAccess
# AWSLambdaExecute
#
# 環境変数
# S3_BUCKET_NAME 例:bucket-name
# S3_BUCKET_PREFIX 例:dir_1/dir_2
# RESULT_OUTPUT_S3_DIR 例:s3://aws-athena-query-results-**************-ap-northeast-1/
# TARGET_DB 例:test_db
# TARGET_LOCATION 例:s3://bucket-name/dir_1/dir_2
# TARGET_TABLE 例:test_table
# RETRY_COUNT 例:20
#
# 対象が多い場合タイムアウト(300秒越え)するので注意
import boto3
import os
import re
import time
# S3のファイル一覧からAthenaのパーティション設定用Query作成し実行する
def lambda_handler(event, context):
file_pass_list = s3_resource_files()
target_date_list = extract_date_from_file_path(file_pass_list)
athena_querys = build_athena_query_list(target_date_list)
for x in athena_querys:
query_response = athena_query_execution(x)
athena_query_result(query_response['QueryExecutionId'])
# S3バケット内のファイル一覧取得
def s3_resource_files():
s3_client = boto3.client('s3')
S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME']
S3_BUCKET_PREFIX = os.environ['S3_BUCKET_PREFIX']
contents = []
next_token = ''
while True:
if next_token == '':
response = s3_client.list_objects_v2(
Bucket=S3_BUCKET_NAME,
Prefix=S3_BUCKET_PREFIX
)
else:
response = s3_client.list_objects_v2(
Bucket=S3_BUCKET_NAME,
Prefix=S3_BUCKET_PREFIX,
ContinuationToken=next_token
)
# endswithで末尾が「/」は除く
file_pass_list = [
content.get('Key')
for content in response.get('Contents')
if not content.get('Key').endswith('/')
]
contents.extend(file_pass_list)
if 'NextContinuationToken' in response:
next_token = response['NextContinuationToken']
else:
break
print(len(contents))
return contents
# ファイルパスのリストから日付辞書リスト作成
def extract_date_from_file_path(file_pass_list):
# パスを正規表現を使って分割
pattern = r"^.+/(\d{4})/(\d{2})/(\d{2})/.+$"
splited_file_pass_list = [re.match(pattern, x) for x in file_pass_list]
# 正規表現にマッチしなかった場合Noneになるので除外しつつ抽出
target_date_list = [
{
"year":x[1],
"month":x[2],
"day":x[3]
}
for x in splited_file_pass_list
if not x is None
]
print(len(target_date_list))
return target_date_list
# Athenaのパーティション設定用クエリリスト作成
def build_athena_query_list(target_date_list):
# Query 作成。重複するQueryは除く
query_set = set()
if len(target_date_list) > 0:
for target_date in target_date_list:
query = build_athena_add_partition_query(target_date)
query_set.add(query)
print(len(query_set))
return query_set
# Athenaのパーティション設定用クエリ作成
def build_athena_add_partition_query(target_date):
# Athenaで作成した対象のテーブル
table = os.environ['TARGET_TABLE']
# パーティションを設定するS3フォルダの接頭詞 例:s3://test-target-bucket/test-dir/
location_prefix = os.environ['TARGET_LOCATION']
# 「IF NOT EXISTS」で存在しない場合のみパーティションを追加するクエリにしている
# https://docs.aws.amazon.com/ja_jp/athena/latest/ug/alter-table-add-partition.html
query_form = "ALTER TABLE {table} " \
+ "ADD IF NOT EXISTS PARTITION (year='{year}', month='{month}', day='{day}') "\
+ "location '{location_prefix}/{year}/{month}/{day}/';"
query = query_form.format(
table = table,
year = target_date["year"],
month = target_date["month"],
day = target_date["day"],
location_prefix = location_prefix
)
return query
# Athenaへクエリ実行
def athena_query_execution(query):
print("QUERY : " + query)
DB = os.environ['TARGET_DB']
RESULT_OUTPUT_S3_DIR = os.environ['RESULT_OUTPUT_S3_DIR']
client = boto3.client('athena')
query_response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DB
},
ResultConfiguration={
'OutputLocation': RESULT_OUTPUT_S3_DIR,
}
)
return query_response
# Athenaへのクエリ実行状態確認
def athena_query_result(query_execution_id):
RETRY_COUNT = int(os.environ['RETRY_COUNT'])
client = boto3.client('athena')
# ステータス確認
for i in range(0, RETRY_COUNT):
query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
#print(query_status)
query_execution_status = query_status['QueryExecution']['Status']['State']
print("STATUS:" + query_execution_status)
if query_execution_status == 'SUCCEEDED':
break
if query_execution_status == 'FAILED':
# raise Exception('FAILED')
break
else:
time.sleep(1)
else:
# タイムアウトした場合Queryを止める
client.stop_query_execution(QueryExecutionId=query_execution_id)
# raise Exception('TIME OUT')
print('TIME OUT')
# 結果取得
# result = client.get_query_results(QueryExecutionId=query_execution_id)
# return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment