Skip to content

Instantly share code, notes, and snippets.

@krisek
Created May 21, 2021 15:28
Show Gist options
  • Save krisek/b9d1c4eb7d82e0c09d30469b8758d42d to your computer and use it in GitHub Desktop.
Save krisek/b9d1c4eb7d82e0c09d30469b8758d42d to your computer and use it in GitHub Desktop.
import json
import requests
import os
import re
import hashlib
templates = {
'slack': {},
'teams': {
"type":"message",
"attachments":[
{
"contentType":"application/vnd.microsoft.card.adaptive",
"content":{
"$schema":"http://adaptivecards.io/schemas/adaptive-card.json",
"type":"AdaptiveCard",
"version":"1.2",
"body":[
{
"type": "TextBlock",
"size": "Medium",
"weight": "Bolder",
"text": "Subject",
"wrap": True,
"color": "Default"
},
{
"type": "TextBlock",
"text": "Message text",
"wrap": True
},
{
"type": "TextBlock",
"text": "Message text",
"wrap": True
},
{
"type": "TextBlock",
"text": "TopicArn",
"wrap": True
},
{
"type": "TextBlock",
"text": "timestamp",
"wrap": True
}
]
}
}
]
},
'pagerduty': {
"payload":
{
"summary": "",
"severity": "error",
"source": "cloudwatch",
"component": "",
"group": "",
"class": ""
},
"routing_key": "",
"event_action": "",
"dedup_key": ""
}
}
colors = {
'slack': {
},
'teams': {
'ALARM': 'Attention',
'OK': 'Good',
'INSUFFICIENT_DATA': 'Warning',
'blue': 'Accent'
}
}
DEFAULT_URLS = 'https://'
def lambda_handler(event, context):
responses = {}
urls = os.environ.get('ALERT_URLS', DEFAULT_URLS).split(';')
for url in urls:
message_type = 'none'
if (re.search(r'office.com', url)):
message_type = 'teams'
if (re.search(r'slack', url)):
message_type = 'slack'
if (re.search(r'pagerduty', url)):
message_type = 'pagerduty'
if message_type == 'none':
continue
message = templates[message_type]
title = event['detail']['alarmName'] + ' ' + event['detail']['state']['value']
if message_type == 'teams':
# set title
title_color = colors[message_type].get(event['detail']['state']['value'], 'Default')
message['attachments'][0]['content']['body'][0]['text'] = title
message['attachments'][0]['content']['body'][0]['color'] = title_color
# set message body
message['attachments'][0]['content']['body'][1]['text'] = event['detail']['state']['reason']
message['attachments'][0]['content']['body'][2]['text'] = event['detail']['state'].get('recentDatapoints','')
message['attachments'][0]['content']['body'][3]['text'] = event['account']
message['attachments'][0]['content']['body'][4]['text'] = " @" + event['time']
if message_type == 'pagerduty':
message['routing_key'] = os.environ.get('PD_ROUTING_KEY', "blah")
message['payload']['summary'] = title
message['payload']['custom_details'] = {'alarm': event['detail']['alarmName'], 'reason': event['detail']['state']['reason'], 'account': event['account'] , 'ts': " @" + event['time'] }
message['dedup_key'] = hashlib.md5(event['detail']['alarmName'].encode()).hexdigest()
if event['detail']['state']['value'] == 'OK':
message['event_action'] = 'resolve'
if event['detail']['state']['value'] == 'ALARM':
message['event_action'] = 'trigger'
if event['detail']['state']['value'] == 'INSUFFICIENT_DATA':
message['event_action'] = 'trigger'
message['payload']['severity'] = 'info'
r = requests.post(url, json=message)
responses[message_type] = r.text + " " + json.dumps(message)
return {
'statusCode': 200,
'body': json.dumps(responses)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment