Last active
July 24, 2018 04:13
-
-
Save shiro01/3d355bb66d94601b5425e84cae03a355 to your computer and use it in GitHub Desktop.
LambdaでS3のファイルパスからAthena用パーティション設定Queryを作成し実行する
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # 使用するIAMロールに設定するポリシー | |
| # AmazonAthenaFullAccess | |
| # AmazonS3ReadOnlyAccess | |
| # AWSLambdaExecute | |
| # | |
| # 環境変数 | |
| # S3_BUCKET_NAME 例:bucket-name | |
| # S3_BUCKET_PREFIX 例:dir_1/dir_2 | |
| # RESULT_OUTPUT_S3_DIR 例:s3://aws-athena-query-results-**************-ap-northeast-1/ | |
| # TARGET_DB 例:test_db | |
| # TARGET_LOCATION 例:s3://bucket-name/dir_1/dir_2 | |
| # TARGET_TABLE 例:test_table | |
| # RETRY_COUNT 例:20 | |
| # | |
| # 対象が多い場合タイムアウト(300秒越え)するので注意 | |
| import boto3 | |
| import os | |
| import re | |
| import time | |
| # S3のファイル一覧からAthenaのパーティション設定用Query作成し実行する | |
| def lambda_handler(event, context): | |
| file_pass_list = s3_resource_files() | |
| target_date_list = extract_date_from_file_path(file_pass_list) | |
| athena_querys = build_athena_query_list(target_date_list) | |
| for x in athena_querys: | |
| query_response = athena_query_execution(x) | |
| athena_query_result(query_response['QueryExecutionId']) | |
| # S3バケット内のファイル一覧取得 | |
| def s3_resource_files(): | |
| s3_client = boto3.client('s3') | |
| S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME'] | |
| S3_BUCKET_PREFIX = os.environ['S3_BUCKET_PREFIX'] | |
| contents = [] | |
| next_token = '' | |
| while True: | |
| if next_token == '': | |
| response = s3_client.list_objects_v2( | |
| Bucket=S3_BUCKET_NAME, | |
| Prefix=S3_BUCKET_PREFIX | |
| ) | |
| else: | |
| response = s3_client.list_objects_v2( | |
| Bucket=S3_BUCKET_NAME, | |
| Prefix=S3_BUCKET_PREFIX, | |
| ContinuationToken=next_token | |
| ) | |
| # endswithで末尾が「/」は除く | |
| file_pass_list = [ | |
| content.get('Key') | |
| for content in response.get('Contents') | |
| if not content.get('Key').endswith('/') | |
| ] | |
| contents.extend(file_pass_list) | |
| if 'NextContinuationToken' in response: | |
| next_token = response['NextContinuationToken'] | |
| else: | |
| break | |
| print(len(contents)) | |
| return contents | |
| # ファイルパスのリストから日付辞書リスト作成 | |
| def extract_date_from_file_path(file_pass_list): | |
| # パスを正規表現を使って分割 | |
| pattern = r"^.+/(\d{4})/(\d{2})/(\d{2})/.+$" | |
| splited_file_pass_list = [re.match(pattern, x) for x in file_pass_list] | |
| # 正規表現にマッチしなかった場合Noneになるので除外しつつ抽出 | |
| target_date_list = [ | |
| { | |
| "year":x[1], | |
| "month":x[2], | |
| "day":x[3] | |
| } | |
| for x in splited_file_pass_list | |
| if not x is None | |
| ] | |
| print(len(target_date_list)) | |
| return target_date_list | |
| # Athenaのパーティション設定用クエリリスト作成 | |
| def build_athena_query_list(target_date_list): | |
| # Query 作成。重複するQueryは除く | |
| query_set = set() | |
| if len(target_date_list) > 0: | |
| for target_date in target_date_list: | |
| query = build_athena_add_partition_query(target_date) | |
| query_set.add(query) | |
| print(len(query_set)) | |
| return query_set | |
| # Athenaのパーティション設定用クエリ作成 | |
| def build_athena_add_partition_query(target_date): | |
| # Athenaで作成した対象のテーブル | |
| table = os.environ['TARGET_TABLE'] | |
| # パーティションを設定するS3フォルダの接頭詞 例:s3://test-target-bucket/test-dir/ | |
| location_prefix = os.environ['TARGET_LOCATION'] | |
| # 「IF NOT EXISTS」で存在しない場合のみパーティションを追加するクエリにしている | |
| # https://docs.aws.amazon.com/ja_jp/athena/latest/ug/alter-table-add-partition.html | |
| query_form = "ALTER TABLE {table} " \ | |
| + "ADD IF NOT EXISTS PARTITION (year='{year}', month='{month}', day='{day}') "\ | |
| + "location '{location_prefix}/{year}/{month}/{day}/';" | |
| query = query_form.format( | |
| table = table, | |
| year = target_date["year"], | |
| month = target_date["month"], | |
| day = target_date["day"], | |
| location_prefix = location_prefix | |
| ) | |
| return query | |
| # Athenaへクエリ実行 | |
| def athena_query_execution(query): | |
| print("QUERY : " + query) | |
| DB = os.environ['TARGET_DB'] | |
| RESULT_OUTPUT_S3_DIR = os.environ['RESULT_OUTPUT_S3_DIR'] | |
| client = boto3.client('athena') | |
| query_response = client.start_query_execution( | |
| QueryString=query, | |
| QueryExecutionContext={ | |
| 'Database': DB | |
| }, | |
| ResultConfiguration={ | |
| 'OutputLocation': RESULT_OUTPUT_S3_DIR, | |
| } | |
| ) | |
| return query_response | |
| # Athenaへのクエリ実行状態確認 | |
| def athena_query_result(query_execution_id): | |
| RETRY_COUNT = int(os.environ['RETRY_COUNT']) | |
| client = boto3.client('athena') | |
| # ステータス確認 | |
| for i in range(0, RETRY_COUNT): | |
| query_status = client.get_query_execution(QueryExecutionId=query_execution_id) | |
| #print(query_status) | |
| query_execution_status = query_status['QueryExecution']['Status']['State'] | |
| print("STATUS:" + query_execution_status) | |
| if query_execution_status == 'SUCCEEDED': | |
| break | |
| if query_execution_status == 'FAILED': | |
| # raise Exception('FAILED') | |
| break | |
| else: | |
| time.sleep(1) | |
| else: | |
| # タイムアウトした場合Queryを止める | |
| client.stop_query_execution(QueryExecutionId=query_execution_id) | |
| # raise Exception('TIME OUT') | |
| print('TIME OUT') | |
| # 結果取得 | |
| # result = client.get_query_results(QueryExecutionId=query_execution_id) | |
| # return result | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment