Last active
November 20, 2018 10:25
-
-
Save revmischa/458ba6c65c7662f0c5eff8feaf7d6c53 to your computer and use it in GitHub Desktop.
Heroku logplex syslog handler lambda
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Sample handler for parsing Heroku logplex drain events (https://devcenter.heroku.com/articles/log-drains#https-drains). | |
Expects messages to be framed with the syslog TCP octet counting method (https://tools.ietf.org/html/rfc6587#section-3.4.1). | |
This is designed to be run as a Python3.6 lambda. | |
""" | |
import json | |
import boto3 | |
import logging | |
import iso8601 | |
import requests | |
from base64 import b64decode | |
from pyparsing import Word, Suppress, nums, Optional, Regex, pyparsing_common, alphanums | |
from syslog import LOG_DEBUG, LOG_WARNING, LOG_INFO, LOG_NOTICE | |
from collections import defaultdict | |
HOOK_URL = "https://" + boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_HOOK_URL))['Plaintext'].decode('ascii') | |
CHANNEL = "#alerts" | |
log = logging.getLogger('myapp.heroku.drain') | |
class Parser(object): | |
def __init__(self): | |
ints = Word(nums) | |
# priority | |
priority = Suppress("<") + ints + Suppress(">") | |
# version | |
version = ints | |
# timestamp | |
timestamp = pyparsing_common.iso8601_datetime | |
# hostname | |
hostname = Word(alphanums + "_" + "-" + ".") | |
# source | |
source = Word(alphanums + "_" + "-" + ".") | |
# appname | |
appname = Word(alphanums + "(" + ")" + "/" + "-" + "_" + ".") + Optional(Suppress("[") + ints + Suppress("]")) + Suppress("-") | |
# message | |
message = Regex(".*") | |
# pattern build | |
self.__pattern = priority + version + timestamp + hostname + source + appname + message | |
def parse(self, line): | |
parsed = self.__pattern.parseString(line) | |
# https://tools.ietf.org/html/rfc5424#section-6 | |
# get priority/severity | |
priority = int(parsed[0]) | |
severity = priority & 0x07 | |
facility = priority >> 3 | |
payload = {} | |
payload["priority"] = priority | |
payload["severity"] = severity | |
payload["facility"] = facility | |
payload["version"] = parsed[1] | |
payload["timestamp"] = iso8601.parse_date(parsed[2]) | |
payload["hostname"] = parsed[3] | |
payload["source"] = parsed[4] | |
payload["appname"] = parsed[5] | |
payload["message"] = parsed[6] | |
return payload | |
parser = Parser() | |
def lambda_handler(event, context): | |
handle_lambda_proxy_event(event) | |
return { | |
"isBase64Encoded": False, | |
"statusCode": 200, | |
"headers": {"Content-Length": 0}, | |
} | |
def handle_lambda_proxy_event(event): | |
body = event['body'] | |
headers = event['headers'] | |
# sanity-check source | |
assert headers['X-Forwarded-Proto'] == 'https' | |
assert headers['Content-Type'] == 'application/logplex-1' | |
# split into chunks | |
def get_chunk(payload: bytes): | |
# payload = payload.lstrip() | |
msg_len, syslog_msg_payload = payload.split(b' ', maxsplit=1) | |
if msg_len == '': | |
raise Exception(f"failed to parse heroku logplex payload: '{payload}'") | |
try: | |
msg_len = int(msg_len) | |
except Exception as ex: | |
raise Exception(f"failed to parse {msg_len} as int, payload: {payload}") from ex | |
# only grab msg_len bytes of syslog_msg | |
syslog_msg = syslog_msg_payload[0:msg_len] | |
next_payload = syslog_msg_payload[msg_len:] | |
yield syslog_msg.decode('utf-8') | |
if next_payload: | |
yield from get_chunk(next_payload) | |
# group messages by source,app | |
# format for slack | |
srcapp_msgs = defaultdict(dict) | |
chunk_count = 0 | |
for chunk in get_chunk(bytes(body, 'utf-8')): | |
chunk_count += 1 | |
evt = parser.parse(chunk) | |
if not filter_slack_msg(evt): | |
# skip stuff filtered out | |
continue | |
# add to group | |
sev = evt['severity'] | |
group_name = f"SEV:{sev} {evt['source']} {evt['appname']}" | |
if sev not in srcapp_msgs[group_name]: | |
srcapp_msgs[group_name][sev] = list() | |
body = evt["message"] | |
srcapp_msgs[group_name][sev].append(str(evt["timestamp"]) + ': ' + evt["message"]) | |
for group_name, sevs in srcapp_msgs.items(): | |
for severity, lines in sevs.items(): | |
if not lines: | |
continue | |
title = group_name | |
# format the syslog event as a slack message attachment | |
slack_att = slack_format_attachment(log_msg=None, log_rec=evt) | |
text = "\n" + "\n".join(lines) | |
slack(text=text, title=title, attachments=[slack_att], channel=channel, severity=severity) | |
# sanity-check number of parsed messages | |
assert int(headers['Logplex-Msg-Count']) == chunk_count | |
return "" | |
def slack_format_attachment(log_msg=None, log_rec=None, title=None): | |
"""Format as slack attachment.""" | |
severity = int(log_rec['severity']) | |
# color | |
color = None | |
if severity == LOG_DEBUG: | |
color = "#aaaaaa" | |
elif severity == LOG_INFO: | |
color = "good" | |
elif severity == LOG_NOTICE: | |
color = "#439FE0" | |
elif severity == LOG_WARNING: | |
color = "warning" | |
elif severity < LOG_WARNING: | |
# error! | |
color = "danger" | |
attachment = { | |
# 'text': "`" + log_msg + "`", | |
# 'parse': 'none', | |
'author_name': title, | |
'color': color, | |
'mrkdwn_in': ['text'], | |
'text': log_msg, | |
# 'fields': [ | |
# # { | |
# # 'title': "Facility", | |
# # 'value': log_rec["facility"], | |
# # 'short': True, | |
# # }, | |
# # { | |
# # 'title': "Severity", | |
# # 'value': severity, | |
# # 'short': True, | |
# # }, | |
# { | |
# 'title': "App", | |
# 'value': log_rec["appname"], | |
# 'short': True, | |
# }, | |
# # { | |
# # 'title': "Source", | |
# # 'value': log_rec["source"], | |
# # 'short': True, | |
# # }, | |
# { | |
# 'title': "Timestamp", | |
# 'value': str(log_rec["timestamp"]), | |
# 'short': True, | |
# } | |
# ] | |
} | |
return attachment | |
def filter_slack_msg(msg): | |
"""Return true if we should send to slack.""" | |
sev = msg["severity"] # e.g. LOG_DEBUG | |
source = msg["source"] # e.g. 'app' | |
appname = msg["appname"] # e.g. 'heroku-postgres' | |
body = msg["message"] | |
if sev >= LOG_DEBUG: | |
return False | |
if body.startswith('DEBUG '): | |
return False | |
# if source == 'app' and sev > LOG_WARNING: | |
# return False | |
if appname == 'router': | |
return False | |
if appname == 'heroku-postgres' and sev >= LOG_INFO: | |
return False | |
if 'sql_error_code = 00000 LOG: checkpoint complete' in body: | |
# ignore checkpoint | |
return False | |
if 'sql_error_code = 00000 NOTICE: pg_stop_backup complete, all required WAL segments have been archived' in body: | |
# ignore checkpoint | |
return False | |
if 'sql_error_code = 00000 LOG: checkpoint starting: ' in body: | |
# ignore checkpoint | |
return False | |
if appname == 'logplex' and body.startswith('Error L10'): | |
# NN messages dropped since... | |
return False | |
return True | |
def slack(text=None, title=None, attachments=[], icon=None, channel='#alerts', severity=LOG_WARNING): | |
if not attachments: | |
return | |
# emoji icon | |
icon = 'mega' | |
if severity == LOG_DEBUG: | |
icon = 'information_source' | |
elif severity == LOG_INFO: | |
icon = 'information_desk_person' | |
elif severity == LOG_NOTICE: | |
icon = 'scroll' | |
elif severity == LOG_WARNING: | |
icon = 'warning' | |
elif severity < LOG_WARNING: | |
# error! | |
icon = 'boom' | |
message = { | |
"username": title, | |
"channel": channel, | |
"icon_emoji": f":{icon}:", | |
"attachments": attachments, | |
"text": text, | |
} | |
print(message) | |
slack_raw(message) | |
def slack_raw(payload): | |
response = requests.post( | |
HOOK_URL, data=json.dumps(payload), | |
headers={'Content-Type': 'application/json'} | |
) | |
if response.status_code != 200: | |
raise ValueError( | |
'Request to slack returned an error %s, the response is:\n%s' | |
% (response.status_code, response.text) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment