Skip to content

Instantly share code, notes, and snippets.

@shiro01
Last active August 1, 2018 12:18
Show Gist options
  • Select an option

  • Save shiro01/8d4ab16e0fadfc0536ce2ee06cf93fc3 to your computer and use it in GitHub Desktop.

Select an option

Save shiro01/8d4ab16e0fadfc0536ce2ee06cf93fc3 to your computer and use it in GitHub Desktop.
build_athena_query_from_s3_by_lambda.pyの改良版。単純に日付からパーティション設定用クエリを作成し実行する
# 使用するIAMロールに設定するポリシー
# AmazonAthenaFullAccess
# AmazonS3ReadOnlyAccess
# AWSLambdaExecute
#
# 環境変数
# RESULT_OUTPUT_S3_DIR 例:s3://aws-athena-query-results-**************-ap-northeast-1/
# TARGET_DB 例:test_db
# TARGET_TABLE 例:test_table
# TARGET_LOCATION 例:s3://bucket-name/dir_1/dir_2
# START_DATE 例:2017/01/01
# END_DATE 例:2018/01/01
#
# start_dateからend_dateまでパーティションを登録する
# クエリの確認はせず、設定後以下のクエリを使用してAthenaで確認する
# SHOW PARTITIONS table_name;
import boto3
import os
import datetime
from dateutil.relativedelta import relativedelta
# S3のファイル一覧からAthenaのパーティション設定用Query作成し実行する
def lambda_handler(event, context):
target_date_list = build_target_date_list()
athena_querys = build_athena_query_list(target_date_list)
for x in athena_querys:
query_response = athena_query_execution(x)
# 開始日、終了日から日付辞書リスト作成
def build_target_date_list():
start_date = os.environ['START_DATE'].split('/')
target_date = datetime.date(int(start_date[0]), int(start_date[1]), int(start_date[2]))
end_date = os.environ['END_DATE'].split('/')
target_end_date = datetime.date(int(end_date[0]), int(end_date[1]), int(end_date[2]))
print(target_date)
print(target_end_date)
target_date_list = []
while True:
target_date_list.append(
{
"year":target_date.strftime("%Y"),
"month":target_date.strftime("%m"),
"day":target_date.strftime("%d")
}
)
# 次の日
target_date = target_date + relativedelta(days=1)
if (target_date >= target_end_date):
break
print(len(target_date_list))
return target_date_list
# Athenaのパーティション設定用クエリリスト作成
def build_athena_query_list(target_date_list):
# Query 作成。重複するQueryは除く
query_set = set()
if len(target_date_list) > 0:
for target_date in target_date_list:
query = build_athena_add_partition_query(target_date)
query_set.add(query)
print(len(query_set))
return query_set
# Athenaのパーティション設定用クエリ作成
def build_athena_add_partition_query(target_date):
# Athenaで作成した対象のテーブル
table = os.environ['TARGET_TABLE']
# パーティションを設定するS3フォルダの接頭詞 例:s3://test-target-bucket/test-dir/
location_prefix = os.environ['TARGET_LOCATION']
# 「IF NOT EXISTS」で存在しない場合のみパーティションを追加するクエリにしている
query_form = "ALTER TABLE {table} " \
+ "ADD IF NOT EXISTS PARTITION (year='{year}', month='{month}', day='{day}') "\
+ "location '{location_prefix}/{year}/{month}/{day}/';"
query = query_form.format(
table = table,
year = target_date["year"],
month = target_date["month"],
day = target_date["day"],
location_prefix = location_prefix
)
return query
# Athenaへクエリ実行
def athena_query_execution(query):
print("QUERY : " + query)
DB = os.environ['TARGET_DB']
RESULT_OUTPUT_S3_DIR = os.environ['RESULT_OUTPUT_S3_DIR']
client = boto3.client('athena')
query_response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DB
},
ResultConfiguration={
'OutputLocation': RESULT_OUTPUT_S3_DIR,
}
)
return query_response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment