Last active
August 23, 2021 16:07
-
-
Save kennovation1/343207088d48d3e47cbee32cc5585c13 to your computer and use it in GitHub Desktop.
Continuously monitor and pretty print Amazon EventBridge traffic as captured in CloudWatch Logs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# busmon.py | |
# Monitor CloudWatch Logs for EventBridge traffic. | |
# | |
# This script assumes that there is a Lambda function snooping an EventBridge bus | |
# and logging the events to a CloudWatch Logs group. | |
# | |
# Your format will likely be different and will require slight adjustments to the parsing | |
# logic below. The default format assumed for this version is: | |
# timestamp and other stuff|field|field|field|SNOOPED detail-type: <eventName> jsonString | |
# That is 5 pipe-delimited fields, where the last field is the only field used. | |
# The 5th field uses a static preamble of "SNOPPED detail-type: " followed by the event name and | |
# a JSON string with the event details. | |
# | |
# To use this script, read through and change as needed for your use case. Most areas that need | |
# to be reviewed are annotated with '#NOTE Review' | |
# | |
# Requires: | |
# Python 3.7+ (i.e., only tested on 3.7.4) | |
# AWS CLI v2 installed on system | |
# or | |
# awslogs (pip install awslogs) | |
# | |
# Usage: | |
# python busmon.py [dev|prod] | |
# | |
# Author: Ken Robbins - CloudPegboard.com | |
import sys | |
import json | |
from subprocess import Popen, PIPE, STDOUT | |
env = 'dev' | |
if len(sys.argv) == 2: | |
env = sys.argv[1] | |
class style: | |
HEADER = '\033[95m' | |
OKBLUE = '\033[94m' | |
OKGREEN = '\033[92m' | |
WARNING = '\033[93m' | |
FAIL = '\033[91m' | |
BOLD = '\033[1m' | |
UNDERLINE = '\033[4m' | |
ENDC = '\033[0m' | |
# Sample event | |
# NOTE Review | |
sampleEvent = '''2020-08-12T14:15:41 2020-08-12 14:15:41,852|INFO|go2:snoop|36|SNOOPED detail-type: app_home_opened {"version": "0", "id": "1e62edf5-f735-81e5-37f0-7cf5407d4d4c", "detail-type": "app_home_opened", "source": "slackbot-events-dev", "account": "123456789012", "time": "2020-08-12T14:15:40Z", "region": "us-east-1", "resources": [], "detail": {"userId": "MYUSERID", "teamId": "ATEAMID", "apiAppId": "ANAPPID", "channelId": "ACHANNELID", "tab": "messages"}} | |
''' | |
# NOTE Review 3 lines | |
env='prod' | |
logGroup = f'/aws/lambda/slackbot-{env}-snoopHandler' | |
since = '1h' | |
filterPattern = 'SNOOPED' | |
# AWS CLI v2 | |
# TODO This does not yet work due to the --follow not playing nicely with pipes | |
# Empirically there is about a 14-second latency from event to display | |
logsCmdAwsCli = [ | |
'/usr/local/bin/aws', | |
'logs', | |
'tail', | |
logGroup, | |
'--format', | |
'short', | |
'--since', | |
since, | |
'--no-paginate', | |
'--filter-pattern=' + filterPattern, | |
'--follow' | |
] | |
# awslogs (pip install awslogs) | |
# Empirically there is about a 12-second latency from event to display | |
logsCmdAwsLogs = [ | |
'awslogs', | |
'get', | |
logGroup, | |
'ALL', | |
'-s=' + since, | |
'--filter-pattern=' + filterPattern, | |
'--watch' | |
] | |
def prettyPrintEvent(line): | |
# NOTE Review. Parse based on your log format | |
msg = line.rstrip().split('|', 4)[4] | |
parts = msg.split(' ', 3) | |
detailType = parts[2] | |
ev = json.loads(parts[3]) | |
# Format and print for good developer UX (only include attributes you really need) | |
# NOTE Review from here to end of function | |
out = { | |
'source': ev['source'], | |
'time': ev['time'], | |
'detail': ev['detail'] | |
} | |
eventTypeColor = style.OKBLUE | |
print(f"\n{style.BOLD}{eventTypeColor}{ev['detail-type']}{style.ENDC}") | |
print(json.dumps(out, indent=4)) | |
######## | |
# MAIN # | |
######## | |
# Test formatting. Comment out for live use. | |
# prettyPrintEvent(sampleEvent) | |
print(f'Starting to monitor log group: {logGroup}') | |
# cmd = logsCmdAwsCli # Do not use for now since "aws logs" does not work properly yet | |
cmd = logsCmdAwsLogs | |
proc = Popen(cmd, stdout=PIPE, shell=False, encoding='utf8') | |
while True: | |
line = proc.stdout.readline() | |
if len(line) == 0 and proc.poll() is not None: | |
break | |
prettyPrintEvent(line) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# snoop.py | |
# Watch all events and log, ignore, or do something else | |
# to help with debugging as specified by env var. | |
# | |
# Make sure to set up EventBridge rules to set this function as a target | |
# for all events that you might want to monitor. | |
# | |
# Runtime: Python 3.7 | |
# | |
# Environment: | |
# Specify to snoop all events or a pipe-delimited list of event names. | |
# export SNOOP_EVENTS=ALL | |
# export SNOOP_EVENTS=NONE | |
# export SNOOP_EVENTS='eventName1|eventName2|...' | |
# | |
# Here's an example Serverless Framework serverless.yml fragment for deploying this function: | |
# snoopHandler: | |
# handler: snoop.snoopHandler | |
# description: Catch all events and dump to logs or other debug actions | |
# memorySize: 128 | |
# timeout: 3 | |
# environment: | |
# SNOOP_EVENTS: 'ALL' | |
# events: | |
# - eventBridge: | |
# pattern: | |
# source: | |
# - 'slackbot-source1-${self:provider.stage}' | |
# - 'slackbot-source2-${self:provider.stage}' | |
# - 'slackbot-source3-${self:provider.stage}' | |
# - 'slackbot-source4-${self:provider.stage}' | |
# | |
# Author: Ken Robbins - CloudPegboard.com | |
import os | |
import json | |
import logging | |
import unittest | |
from logutils import setupLogging | |
def snoopHandler(event, context): | |
snoopEvents = os.environ.get('SNOOP_EVENTS', 'ALL') | |
if snoopEvents == 'NONE': | |
return 'Ignored event' | |
# You do you. setupLogging is a local utility that configures the logger | |
# the way CloudPegboard.com does logging. Your's will likely be differed. | |
# Here's format string that setupLogging happens to use: | |
# '%(asctime)s|%(levelname)s|go2:%(module)s|%(lineno)d|%(message)s' | |
logger = setupLogging(logging.INFO) | |
detailType = event['detail-type'] | |
detailTypes = snoopEvents.split('|') | |
if (snoopEvents == 'ALL') or (detailType in detailTypes): | |
logger.info(f'SNOOPED detail-type: {detailType} ' + json.dumps(event)) | |
return f'Logged event: {detailType}' | |
else: | |
return f'Skipped event: {detailType}' | |
############# | |
# unittests # | |
############# | |
class TestController(unittest.TestCase): | |
def setUp(self): | |
self.event = { | |
"version": "0", | |
"id": "someLongIdString", | |
"detail-type": "app_home_opened", | |
"source": "slackbot-events-dev", | |
"account": "123456789012", | |
"time": "2020-07-31T18:27:14Z", | |
"region": "us-east-1", | |
"resources": [], | |
"detail": { | |
"userId": "someUserId", | |
"teamId": "someTeamId", | |
"apiAppId": "someAppId", | |
"channelId": "someChannelId", | |
"tab": "messages" | |
} | |
} | |
self.eventType = self.event['detail-type'] | |
self.logSample = '2020-08-17 17:31:57,750|INFO|go2:snoopblog|36|SNOOPED detail-type: app_home_opened {"version": "0", "id": "someLongIdString", "detail-type": "app_home_opened", "source": "slackbot-events-dev", "account": "123456789012", "time": "2020-07-31T18:27:14Z", "region": "us-east-1", "resources": [], "detail": {"userId": "someUserId", "teamId": "someTeamId", "apiAppId": "someAppId", "channelId": "someChannelId", "tab": "messages"}}' | |
def test_snoopALL(self): | |
os.environ['SNOOP_EVENTS'] = 'ALL' | |
response = snoopHandler(self.event, None) | |
self.assertEqual(response, f'Logged event: {self.eventType}') | |
print('\nLog sample should look like this:') | |
print(self.logSample + '\n') | |
def test_snoopSpecific(self): | |
os.environ['SNOOP_EVENTS'] = f'{self.eventType}|someOtherEvent' | |
response = snoopHandler(self.event, None) | |
self.assertEqual(response, f'Logged event: {self.eventType}') | |
def test_snoopNoMatch(self): | |
os.environ['SNOOP_EVENTS'] = 'someEvent|someOtherEvent' | |
response = snoopHandler(self.event, None) | |
self.assertEqual(response, f'Skipped event: {self.eventType}') | |
def test_snoopNONE(self): | |
os.environ['SNOOP_EVENTS'] = 'NONE' | |
response = snoopHandler(self.event, None) | |
self.assertEqual(response, 'Ignored event') | |
######## | |
# MAIN # | |
######## | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment