Last active
May 10, 2023 11:17
-
-
Save KensoDev/d9f5ea978b16bac06463c6c78191f220 to your computer and use it in GitHub Desktop.
Lambda function to notify slack on ECS failed deployments
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Get notification from ECS when a deployment is failing. Post to slack | |
This lambda function receives a notification from a cloudwatch log filter when | |
an ECS deployment fails, it creates a hash based on the cluster arn and then | |
task definition and saves the file in S3. | |
To make sure we don't get duplicate, if we already have a file in S3, we don't | |
resend the notification. | |
""" | |
from json import dumps | |
from os import environ | |
from re import ( | |
search | |
) | |
import requests | |
from boto3 import client | |
def handler(event, context): | |
task_definition_arn = event["detail"]["taskDefinitionArn"] | |
task_group = event["detail"]["group"] | |
cluster_arn = event["detail"]["clusterArn"] | |
event_json = dumps(event) | |
process_record(cluster_arn, task_group, task_definition_arn, event_json) | |
def get_message_key(cluster_arn, task_definition_arn, task_group): | |
m = search(r"/(.*)", cluster_arn) | |
cluster_name = m.group(1) | |
m = search(r"/(.*)", task_definition_arn) | |
task_definition_name = m.group(1) | |
return "{}/{}/{}-failure-message.json".format(cluster_name, task_group, task_definition_name) | |
def check_for_message(s3, bucket_name, message_key): | |
""" | |
Check if the message already exists in the bucket | |
:s3: S3 client | |
:bucket_name: Name of the bucket | |
:message_key: Key of the message to look for | |
:returns: False if the message doesn't exist and true if it does | |
""" | |
try: | |
s3.get_object( | |
Bucket=bucket_name, | |
Key=message_key, | |
) | |
return True | |
except Exception as e: | |
print(e) # noqa | |
return False | |
def post_to_slack(task_group, task_definition_arn): | |
""" | |
Post failure message to slack | |
:task_group: Task Group name from the AWS event | |
:task_definition_arn: Task Definition identifier from the AWS event | |
:returns: False if the message was not posted to slack True if it was | |
""" | |
webhook_url = environ["SLACK_WEBHOOK_URL"] | |
message = "The task [{}] is failing to deploy to [{}]".format( | |
task_group, | |
task_definition_arn, | |
) | |
slack_data = {"text": message} | |
response = requests.post( | |
webhook_url, data=dumps(slack_data), | |
headers={'Content-Type': 'application/json'} | |
) | |
if response.status_code != 200: | |
print("Error posting the message to slack, response wasn't 200") # noqa | |
return False | |
return True | |
def process_record(cluster_arn, task_group, task_definition_arn, event_json): | |
s3 = client("s3") | |
bucket_name = environ["BUCKET_NAME"] | |
message_key = get_message_key(cluster_arn, task_definition_arn, task_group) | |
if check_for_message(s3, bucket_name, message_key): | |
return True | |
if post_to_slack(task_group, task_definition_arn): | |
s3.put_object( | |
ACL="private", | |
Body=event_json, | |
Bucket=bucket_name, | |
Key=message_key, | |
ContentType="application/json", | |
) | |
print("Success!") # noqa |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment