Last active
November 23, 2016 05:43
-
-
Save brandond/d5701fb763cce3b9d871394704b43db5 to your computer and use it in GitHub Desktop.
Evident ESP Python SDK sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Load API Keys from dotenv file | |
from dotenv import load_dotenv, find_dotenv | |
load_dotenv(find_dotenv()) | |
# Raise items per page to max | |
from esp import settings | |
settings.settings.per_page = 100 | |
# Add Unix CSV dialect | |
import csv | |
class unix_dialect(csv.Dialect): | |
"""Describe the usual properties of Unix-generated CSV files.""" | |
delimiter = ',' | |
quotechar = '"' | |
doublequote = True | |
skipinitialspace = False | |
lineterminator = '\n' | |
quoting = csv.QUOTE_ALL | |
csv.register_dialect("unix", unix_dialect) | |
# Enable request tracing | |
import os | |
if os.environ.get('DEBUG'): | |
import requests | |
import logging | |
# These two lines enable debugging at httplib level (requests->urllib3->http.client) | |
# You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA. | |
# The only thing missing will be the response.body which is not logged. | |
try: | |
import http.client as http_client | |
except ImportError: | |
# Python 2 | |
import httplib as http_client | |
http_client.HTTPConnection.debuglevel = 1 | |
# You must initialize logging, otherwise you'll not see debug output. | |
logging.basicConfig() | |
logging.getLogger().setLevel(logging.DEBUG) | |
requests_log = logging.getLogger("requests.packages.urllib3") | |
requests_log.setLevel(logging.DEBUG) | |
requests_log.propagate = True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
from collections import OrderedDict | |
import sys | |
sys.path.insert(0,sys.path[0]+'/..') | |
import evident | |
import base64 | |
import json | |
import esp | |
import csv | |
import sys | |
# Extract some useful bit of metadata that varies by signature | |
def get_metadata(alert): | |
tags = OrderedDict() | |
if alert.metadata: | |
if alert.signature.identifier in ['AWS:EC2-002','AWS:EC2-003','AWS:EC2-004','AWS:EC2-005', | |
'AWS:EC2-006','AWS:EC2-009','AWS:EC2-010','AWS:EC2-011', | |
'AWS:EC2-012','AWS:EC2-013','AWS:EC2-014','AWS:EC2-015', | |
'AWS:EC2-016','AWS:EC2-017','AWS:EC2-018','AWS:EC2-019', | |
'AWS:EC2-020','AWS:EC2-021','AWS:EC2-022','AWS:EC2-023', | |
'AWS:EC2-024']: | |
tags['attachedEC2Instances'] = (alert.metadata.data.get('attachedEC2Instances') or | |
alert.metadata.data.get('details', {}).get('attachedEC2Instances', [])) | |
elif alert.signature.identifier == 'AWS:ELB-005': | |
tags['certificateName'] = (alert.metadata.data.get('certificateName') or | |
alert.metadata.data.get('details', {}).get('certificateName', '')) | |
elif alert.signature.identifier == 'AWS:IAM-015': | |
tags['user'] = (alert.metadata.data.get('user') or | |
alert.metadata.data.get('details', {}).get('user', '')) | |
if alert.tags: | |
for tag in alert.tags: | |
tags[tag.key] = tag.value | |
return json.dumps(tags) | |
# Get alert message. Some signatures appear to be buggy and don't base64-decode the message value properly | |
def get_message(alert): | |
message = get_message_payload(alert) | |
if isinstance(message, dict): | |
return base64.b64decode(message['bytes']) | |
else: | |
return message | |
# Get the alert's message from the metadata. Depending on the signature and even how old the alert is, | |
# the message may be at a different location in the metadata structure. | |
def get_message_payload(alert): | |
if not alert.metadata: | |
return | |
if 'details' in alert.metadata.data: | |
if 'message' in alert.metadata.data['details']: | |
return alert.metadata.data['details']['message'] | |
elif 'condition' in alert.metadata.data['details']: | |
return alert.metadata.data['details']['condition'] | |
elif 'message' in alert.metadata.data: | |
return alert.metadata.data['message'] | |
return | |
# Export unsupressed alerts from the latest report for each account | |
def export_alerts(): | |
reports = [] | |
alerts = [] | |
# Get latest report for each account | |
# Doesn't appear to be a way to get the latest, so we just shrink the page size to 1 | |
# and grab that. | |
for e in esp.ExternalAccount.where(): | |
esp.settings.settings.per_page = 1 | |
reports.extend(esp.Report.where(include='external_account',external_account_id=e.id_)) | |
esp.settings.settings.per_page = 100 | |
# Run through pages of alerts for each report, adding to alert list | |
for r in reports: | |
print('Retrieving alerts for Account {0.external_account.account} from Report ID {0.id_} updated at {0.updated_at}'.format(r), file=sys.stderr) | |
page = esp.Alert.where(report_id=r.id_, status='fail', not_suppressed='true', signature_risk_level=['high'], | |
include='external_account,region,signature,metadata,tags') | |
while True: | |
alerts.extend([a for a in page if not a.ended_at]) | |
if page.current_page_number == page.last_page_number: | |
break | |
else: | |
page = page.next_page() | |
# Set up CSV export | |
fieldnames = ['ID', 'Account', 'Region', 'Resource', 'Signature', 'Name', 'Message', 'Metadata'] | |
writer = csv.DictWriter(sys.stdout, fieldnames=fieldnames, extrasaction='ignore', dialect='unix') | |
writer.writeheader() | |
alerts.sort(key=lambda a: int(a.id_)) | |
for a in alerts: | |
writer.writerow({'ID': a.id_, | |
'Account': a.external_account.name, | |
'Region': a.region.code, | |
'Resource': a.resource, | |
'Signature': a.signature.identifier, | |
'Name': a.signature.name, | |
'Message': get_message(a), | |
'Metadata': get_metadata(a)}) | |
if __name__ == '__main__': | |
export_alerts() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment