Skip to content

Instantly share code, notes, and snippets.

@jdaarevalo
Last active August 26, 2022 06:52
Show Gist options
  • Select an option

  • Save jdaarevalo/bb7f034d533408797bbd15524455fee9 to your computer and use it in GitHub Desktop.

Select an option

Save jdaarevalo/bb7f034d533408797bbd15524455fee9 to your computer and use it in GitHub Desktop.
Lambda function query_send_report
"""
Purpose
Run an Athena query and post in Slack the output location as CSV
Tag the user who does the request
"""
import os
import json
import time
import boto3
import urllib3
import ast
def lambda_handler(event, context):
# Create the Athena client with boto3
athena_client = boto3.client('athena')
# Fetch the query, here you can use a named_query_id or directly the query with params
query = get_query()
# Execute the Athena query
query_execution_id = execute_query(athena_client, query)
# Fetch the location in Amazon s3 with the results
s3_uri_output_location = query_execution_output_location(athena_client, query_execution_id)
# Fetch the HTTP link
report_http_link = get_presigned_http_link(s3_uri_output_location)
# Send a message to slack
send_slack_message(event, report_http_link)
def get_query():
"""
:return: query string
:rtype: str
"""
query = "select col_1, col_2 from athena_table where col_3 = my_filter"
return query
def execute_query(athena_client, query):
"""
Return the query_execution_id
:param athena_client: The boto3 Athena client
:type athena_client: boto3.client
:param query: query to execute in Athena
:type query: str
:return: query_execution_id
:rtype: str
"""
output_location = 's3://{bucket_name}/{lambda_results_folder}/'.format(bucket_name=os.environ.get("BUCKET_NAME"),
lambda_results_folder=os.environ.get("LAMBDA_RESULTS_FOLDER"))
query_execution = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={ 'Database': os.environ.get("ATHENA_DATABASE")},
ResultConfiguration={ 'OutputLocation': output_location }
)
return query_execution["QueryExecutionId"]
def query_execution_output_location(athena_client, query_execution_id):
"""
Return the output location of a query_execution, after validating the query is finished
every 2 secs until the query_execution state is 'SUCCEEDED' or after 60 validations.
:param athena_client: The boto3 Athena client
:type athena_client: boto3.client
:param query_execution_id: The query_execution_id
:type query_execution_id: str
:return: location in Amazon S3 where query results are stored
:rtype: str
"""
query_execution_state = 'RUNNING'
max_execution = 60
while (max_execution > 0 and query_execution_state in ['RUNNING', 'QUEUED']):
max_execution = max_execution - 1
query_execution_response = athena_client.get_query_execution(
QueryExecutionId=query_execution_id
)["QueryExecution"]
query_execution_state = query_execution_response["Status"]["State"]
if query_execution_state == 'FAILED':
return False
elif query_execution_state == 'SUCCEEDED':
return query_execution_response["ResultConfiguration"]["OutputLocation"]
time.sleep(2)
def get_presigned_http_link(s3_uri):
"""
Generate a presigned URL for users who does not have permission to access
an S3 object
:param s3_uri: location in Amazon S3
:type s3_uri: str
:return: http link
:rtype: str
"""
bucket_name = os.environ.get("BUCKET_NAME")
secs_to_expire_report = os.environ.get("SECS_TO_EXPIRE_REPORT")
s3_client = boto3.client('s3')
object_name = s3_uri.split(bucket_name +'/')[1]
return s3_client.generate_presigned_url('get_object',
Params={'Bucket': bucket_name, 'Key': object_name},
ExpiresIn=secs_to_expire_report)
def send_slack_message(event, report_http_link):
"""
Send a slack message tagging the user who does the request, with the link to download the CSV report
:param event: Lambda event
:type event: dict
:param report_http_link: presigned URL to download the report
:type report_http_link: str
:return: None
"""
slack_webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
http = urllib3.PoolManager()
try:
json_body_request = ast.literal_eval(event['Records'][0]['body'])
tag_user = "<@{}>".format(json_body_request['user_id_request'])
except:
tag_user = " "
data = {
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Hey {} good news! your report has been generated".format(tag_user)
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Type:*\nCSV"
},
{
"type": "mrkdwn",
"text": "*Download here:*\n<{}|report.csv>".format(report_http_link)
}
]
}
]
}
http.request("POST",
slack_webhook_url,
body = json.dumps(data),
headers = {"Content-Type": "application/json"})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment