Skip to content

Instantly share code, notes, and snippets.

@ohsawa0515
Created May 22, 2021 09:40
Show Gist options
  • Save ohsawa0515/d1c0907b8769b3a3c2b0f42f3ecadd0e to your computer and use it in GitHub Desktop.
Save ohsawa0515/d1c0907b8769b3a3c2b0f42f3ecadd0e to your computer and use it in GitHub Desktop.
import boto3
from botocore.exceptions import ClientError
import datetime
import time
import os
from dateutil.relativedelta import relativedelta
athena_query_wait_time = 3
class CurExtraction:
def __init__(self, bucket_name: str, subfolder: str, athena_database: str,
extraction_query_name: str, athena_output_location: str):
self.bucket_name = bucket_name
self.subfolder = subfolder
self.athena_database = athena_database
self.extraction_query_name = extraction_query_name
self.athena_output_location = athena_output_location
self.s3_client = boto3.client('s3')
self.athena_client = boto3.client('athena')
def clear_s3_folder(self, month_folder: str) -> None:
"""Clear the S3 folders for the current month.
:param month_folder: S3 path of year and month. e.g. year_1=2021/month_1=5
:return: None
"""
prefix = self.subfolder + '/' + month_folder
try:
response = self.s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=prefix,
)
print("keycount in bucket to be deleted: " + str(response['KeyCount']))
if (response['KeyCount'] > 0):
s3objects = response['Contents']
for s3object in s3objects:
objectkey = s3object['Key']
print("delete object objectkey is: " + objectkey)
response = self.s3_client.delete_object(
Bucket=self.bucket_name,
Key=objectkey
)
except ClientError as e:
print(e)
return
def _drop_temp_table(self) -> None:
"""Delete temporary table of athena for used to extraction.
:return: None
"""
drop_query_string = 'DROP TABLE IF EXISTS ' + self.athena_database + '.temp_table'
print("inside drop_query_string: " + drop_query_string)
try:
executionID_drop = self.athena_client.start_query_execution(
QueryString=drop_query_string,
ResultConfiguration={
'OutputLocation': self.athena_output_location,
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_S3',
}
}
)
# A busy wait to make sure its finished before moving on
# Tables must not exist before creation
while True:
response_exec = self.athena_client.get_query_execution(
QueryExecutionId=executionID_drop['QueryExecutionId']
)['QueryExecution']['Status']['State']
if response_exec not in ['QUEUED', 'RUNNING']:
print("completed drop_query_string: " + drop_query_string)
break
time.sleep(athena_query_wait_time)
except ClientError as e:
print(e)
return
def _execute_create_query(self, query_string: str, month_folder: str, interval: str) -> None:
"""Execute a replaced CUR extraction query in Athena.
Save the query as Named Query in Athena in advance and call the query.
The output folder and month folder are placeholders and will be replaced in the query.
:param query_string: Query stored in Athena
:param month_folder: S3 path of year and month. e.g. year_1=2021/month_1=5
:param interval: Interval from the current month
:return: None
"""
# Replace the '__subfolder__' string in the query with the folder structure for the current month
new_query = query_string.replace('__subfolder__', month_folder).replace('__interval__', interval)
try:
executionID_create = self.athena_client.start_query_execution(
QueryString=new_query,
ResultConfiguration={
'OutputLocation': self.athena_output_location,
'EncryptionConfiguration': {
'EncryptionOption': 'SSE_S3',
}
}
)
while True:
response_exec = self.athena_client.get_query_execution(
QueryExecutionId=executionID_create['QueryExecutionId']
)['QueryExecution']['Status']['State']
if response_exec not in ['QUEUED', 'RUNNING']:
print("completed create_query_string: " + new_query)
break
time.sleep(athena_query_wait_time)
except ClientError as e:
print(e)
return
def run_athena_extraction_query(self, month_folder: str, interval: str) -> None:
"""Executes a Named Query (CUR extraction query) that matches the given query name.
If there are multiple Named Queries that match the given query name, execute all of them.
Delete the temporary table in advance and then execute the CUR extraction query.
:param month_folder: S3 path of year and month. e.g. year_1=2021/month_1=5
:param interval: Interval from the current month
:return: None
"""
response = self.athena_client.list_named_queries()
named_query_IDs = response['NamedQueryIds']
for query_ID in named_query_IDs:
try:
named_query = self.athena_client.get_named_query(
NamedQueryId=query_ID
)
except ClientError as e:
print(e)
return
query_string = named_query['NamedQuery']['QueryString']
query_name = named_query['NamedQuery']['Name']
# If the query name is included in the extracted query name
# Drop the temporary table and then run the data extraction query
if self.extraction_query_name in query_name:
# Delete temporary table
self._drop_temp_table()
self._execute_create_query(query_string, month_folder, interval)
return
def lambda_handler(event, context):
# Get the current date, so you know which months folder you're working on
now = datetime.datetime.now()
lastmonth = now - relativedelta(months=1)
current_month = 'year_1=' + str(now.year) + '/month_1=' + str(now.month) + '/'
previous_month = 'year_1=' + str(lastmonth.year) + '/month_1=' + str(lastmonth.month) + '/'
cur_extraction = CurExtraction(
os.environ['bucket_name'],
os.environ['subfolder'],
os.environ['athena_database'],
os.environ['extraction_query_name'],
os.environ['athena_output_location'],
)
# Clear the current and previous months S3 folder
cur_extraction.clear_s3_folder(current_month)
cur_extraction.clear_s3_folder(previous_month)
cur_extraction.run_athena_extraction_query(current_month, '0')
cur_extraction.run_athena_extraction_query(previous_month, '1')
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment