Created
February 20, 2018 02:44
-
-
Save patrobinson/864e46e989a2df53eed70dccae560302 to your computer and use it in GitHub Desktop.
(SO0037) - Real Time Insights on AWS Account Activity
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
############################################################################## | |
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. # | |
# # | |
# Licensed under the Amazon Software License (the 'License'). You may not # | |
# use this file except in compliance with the License. A copy of the # | |
# License is located at # | |
# # | |
# http://aws.amazon.com/asl/ # | |
# # | |
# or in the 'license' file accompanying this file. This file is distributed # | |
# on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, # | |
# express or implied. See the License for the specific language governing # | |
# permissions and limitations under the License. # | |
############################################################################## | |
from __future__ import print_function | |
from itertools import groupby | |
import boto3 | |
import botocore | |
import base64 | |
import os | |
import logging | |
import urllib2 | |
from json import loads,dumps | |
from collections import OrderedDict | |
from operator import itemgetter | |
from random import randint | |
from sys import maxint | |
from time import sleep | |
log_level = str(os.environ.get('LOG_LEVEL')).upper() | |
if log_level not in ['DEBUG', 'INFO','WARNING', 'ERROR','CRITICAL']: | |
log_level = 'ERROR' | |
log = logging.getLogger() | |
log.setLevel(log_level) | |
send_anonymous_data = str(os.environ.get('SEND_ANONYMOUS_DATA')).upper() | |
ip_table_name = "cloudtrail-log-ip-metrics" | |
table_name="cloudtrail-log-analytics-metrics" | |
calls_per_ip="CallsPerUniqueIp" | |
successful_calls = "NumberOfSuccessfulCalls" | |
anomaly_score = "AnomalyScore" | |
max_retry_attempts = 5 | |
client = boto3.client('dynamodb') | |
def update_dynamodb(record_data): | |
ddb_record = client.get_item(TableName=table_name, | |
Key={'MetricType': {'S':metric_type}, | |
'EventTime':{'S':event_time} }, | |
ConsistentRead=True) | |
ddb_data = loads(ddb_record['Item']['Data']['S']) | |
concurrency_token = int(ddb_record['Item']['ConcurrencyToken']['N']) | |
merged_data = { k : record_data.get(k,0) + ddb_data.get(k,0) for k in set(record_data) | set(ddb_data) } | |
record_data = OrderedDict(sorted(merged_data.iteritems(), key=itemgetter(1), reverse=True)) | |
put_record(metric_type, event_time, record_data, concurrency_token) | |
def put_record_with_retry(metric_type, event_time, record_data, merged_data, concurrency_token, attempt=0): | |
log.info("Retry: {0} {1} {2}".format(metric_type, event_time, str(attempt))) | |
if attempt > max_retry_attempts: return | |
try: | |
put_record(metric_type, event_time, merged_data, concurrency_token) | |
except botocore.exceptions.ClientError as e: | |
if e.response['Error']['Code'] == 'ConditionalCheckFailedException': | |
sleep(randint(0,5)) | |
ddb_record = client.get_item(TableName=table_name, | |
Key={'MetricType': {'S':metric_type}, | |
'EventTime':{'S':event_time} }, | |
ConsistentRead=True) | |
merged_data = merge_record_with_ddb(record_data, ddb_record) | |
put_record_with_retry(metric_type, event_time, record_data, merged_data, concurrency_token, attempt+1) | |
else: raise | |
def put_record(metric_type, event_time, data, concurrency_token=None): | |
item = {'MetricType': {'S':metric_type}, | |
'EventTime':{'S':event_time}, | |
'Data':{'S':dumps(data)}, | |
'ConcurrencyToken':{'N':str(randint(0,maxint))}} | |
if concurrency_token: | |
client.put_item(TableName=table_name, Item=item, | |
ConditionExpression='ConcurrencyToken = :concurrency_token', | |
ExpressionAttributeValues={':concurrency_token':{'N':str(concurrency_token)}}) | |
else: | |
client.put_item(TableName=table_name, Item=item) | |
def merge_record_with_ddb(record_data, ddb_record): | |
ddb_data = loads(ddb_record['Item']['Data']['S']) | |
concurrency_token = int(ddb_record['Item']['ConcurrencyToken']['N']) | |
merged_data = { k : record_data.get(k,0) + ddb_data.get(k,0) for k in set(record_data) | set(ddb_data) } | |
merged_data = OrderedDict(sorted(merged_data.iteritems(), key=itemgetter(1), reverse=True)) | |
return merged_data | |
def merge_record_values(metric_key, grouped_rows): | |
if 'AnomalyScore' in metric_key: | |
return sum(float(key[5]) for key in grouped_rows) | |
else: | |
return sum(int(key[5]) for key in grouped_rows) | |
#This function sends anonymous usage data, if enabled | |
def sendAnonymousData(event_time,dataDict): | |
log.debug("Sending Anonymous Data") | |
postDict = {} | |
postDict['Data'] = dataDict | |
postDict['TimeStamp'] = event_time | |
postDict['Solution'] = 'SO0037' | |
postDict['UUID'] = os.environ.get('UUID') | |
# API Gateway URL to make HTTP POST call | |
url = 'https://metrics.awssolutionsbuilder.com/generic' | |
data=dumps(postDict) | |
log.debug(data) | |
headers = {'content-type': 'application/json'} | |
req = urllib2.Request(url, data, headers) | |
rsp = urllib2.urlopen(req) | |
rspcode = rsp.getcode() | |
content = rsp.read() | |
log.debug("Response from APIGateway: %s, %s", rspcode, content) | |
def lambda_handler(event, context): | |
payload = event['Records'] | |
output = {} | |
data = [base64.b64decode(record['kinesis']['data']).strip().split(',') for record in payload] | |
data = filter(lambda x: x[2]!="null", data) | |
log.info(data) | |
for metric_key,metric_group in groupby(data, key=lambda x:"{0}|{1}".format(x[0],x[1])): | |
grouped_metric = list(metric_group) | |
for category_key,grouped_rows in groupby(grouped_metric, key=lambda x: "{0}|{1}".format(x[2],x[3])): | |
output.setdefault(metric_key, {})[category_key] = merge_record_values(metric_key, list(grouped_rows)) | |
for record_key in output: | |
event_time,metric_type = record_key.split('|') | |
record_data = OrderedDict(sorted(output[record_key].iteritems(), key=itemgetter(1), reverse=True)) | |
ddb_record = client.get_item(TableName=table_name, | |
Key={'MetricType': {'S':metric_type}, | |
'EventTime':{'S':event_time} }, | |
ConsistentRead=True) | |
if 'Item' not in ddb_record: | |
put_record(metric_type,event_time, record_data) | |
else: | |
merged_data = merge_record_with_ddb(record_data, ddb_record) | |
put_record_with_retry(metric_type, event_time, record_data, merged_data, int(ddb_record['Item']['ConcurrencyToken']['N'])) | |
if metric_type == calls_per_ip: | |
max_ip = next(iter(record_data)) | |
max_ip_count = record_data[max_ip] | |
max_ip = max_ip.split('|')[0] | |
hour,minute,_ = event_time.split(':') | |
ddb_max_ip = client.get_item(TableName=ip_table_name, | |
Key={'Hour': {'S': hour}, | |
'Minute':{'S':minute} }, | |
ConsistentRead=True) | |
if 'Item' not in ddb_max_ip or max_ip_count > int(ddb_max_ip['Item']['MaxCount']['N']): | |
client.put_item(TableName=ip_table_name, | |
Item={'Hour': {'S':hour}, | |
'Minute':{'S':minute}, | |
'IP':{'S':max_ip}, | |
'MaxCount':{'N': str(max_ip_count)}} ) | |
if send_anonymous_data == "YES": | |
try: | |
unique_keys = list(set(output)) | |
for record_key in unique_keys: | |
event_time,metric_type = record_key.split('|') | |
if metric_type == successful_calls or metric_type == anomaly_score: | |
ddb_record = client.get_item(TableName=table_name, | |
Key={'MetricType': {'S':metric_type}, | |
'EventTime':{'S':event_time} }, | |
ConsistentRead=True) | |
del ddb_record["Item"]["ConcurrencyToken"] | |
del ddb_record["Item"]["EventTime"] | |
metric_data= {} | |
metric_data['MetricType'] = ddb_record['Item']['MetricType']['S'] | |
if metric_type == successful_calls: | |
services, num_calls = ddb_record['Item']['Data']['S'].split(',')[0].split(':') | |
metric_data['NumberOfSuccessfulCalls'] = num_calls.replace('}','').replace(' ', '') | |
if metric_type == anomaly_score: | |
num_calls,anomaly_data = ddb_record['Item']['Data']['S'].split(',')[0].split(':') | |
metric_data['NumberOfSuccessfulCalls'] = num_calls.replace('{', '').replace('"', '').split('|')[0] | |
metric_data['AnamonlyScore'] = anomaly_data.replace('}', '') | |
sendAnonymousData(event_time,metric_data) | |
except Exception as error: | |
log.error(error) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment