Created
February 13, 2018 04:32
-
-
Save nvisium-jonn-callahan/e89db90d459ea3e188eb942552f2ec27 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import rsa | |
import gzip | |
import json | |
import boto3 | |
import queue | |
import logging | |
import hashlib | |
import binascii | |
from logging import handlers | |
from concurrent import futures | |
from datetime import datetime as dt, timedelta as td | |
# number of previous log digests to verify. they are generated once per hour, so this functionally | |
# equates to how many hours back the logfiles should be verified. | |
NUM_VERIFY = 36 | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
def start_verification(account_id, digest_bucket, region): | |
# boto3 is not thread-safe by default. we create a session per thread to enable this. | |
session = boto3.session.Session() | |
digest_obj = get_latest_digest_obj(account_id, digest_bucket, region, session) | |
for i in range(1, NUM_VERIFY+1): | |
if digest_obj is None: | |
logger.warning('Log digest chain broken - only processed %s digest files' % i) | |
break | |
digest_obj, digest_bucket = digest_verification(digest_obj, digest_bucket, region, session) | |
def get_latest_digest_obj(account_id, digest_bucket, region, session): | |
s3 = session.client('s3') | |
current_time = dt.utcnow() | |
# quick and dirty check for the latest digest file via a prefix. good chance it won't return anything | |
# for regions that are rarely/never used towards the beginning of a month. | |
prefix = 'AWSLogs/%s/CloudTrail-Digest/%s/%s/%02d/' % ( | |
account_id, | |
region, | |
current_time.year, | |
current_time.month | |
) | |
resp = s3.list_objects_v2(Bucket=digest_bucket, Prefix=prefix) | |
obj_list = sorted(resp.get('Contents', []), key=lambda k: k['LastModified'], reverse=True) | |
if len(obj_list) == 0: | |
logger.error('Unable to find the most recent digest file for the %s region within bucket: %s' % (region, digest_bucket)) | |
return None | |
return obj_list[0]['Key'] | |
def digest_verification(digest_obj, cloudtrail_bucket, region, session): | |
# cloudtrail calls must be region-aware | |
s3 = session.client('s3') | |
cloudtrail = session.client('cloudtrail', region_name=region) | |
logger.info('Fetching digest file: %s' % digest_obj) | |
resp = s3.get_object(Bucket=cloudtrail_bucket, Key=digest_obj) | |
signature = binascii.a2b_hex(resp['Metadata']['signature']) | |
algo = resp['Metadata']['signature-algorithm'] | |
content = gzip.decompress(resp['Body'].read()) | |
dict_content = json.loads(content) | |
fingerprint = dict_content['digestPublicKeyFingerprint'] | |
log_file_list = dict_content['logFiles'] | |
start_time = dict_content['digestStartTime'] | |
end_time = dict_content['digestEndTime'] | |
previous_digest_sig = dict_content['previousDigestSignature'] | |
previous_digest_obj = dict_content['previousDigestS3Object'] | |
previous_digest_bucket = dict_content['previousDigestS3Bucket'] | |
# this val must be the string 'null' if it `is None`. this is "to match the java impl" | |
# https://github.com/aws/aws-cli/blob/e2295b022db35eea9fec7e6c5540d06dbd6e588b/awscli/customizations/cloudtrail/validation.py#L552 | |
previous_digest_sig = 'null' if previous_digest_sig is None else previous_digest_sig | |
resp = cloudtrail.list_public_keys(StartTime=start_time, EndTime=end_time) | |
key_list = resp['PublicKeyList'] | |
public_key_content = [key['Value'] for key in key_list if key['Fingerprint'] == fingerprint] | |
if len(public_key_content) != 1: | |
logger.critical('Unable to find matching key for log file digest: %s' % digest_obj) | |
sys.exit(1) | |
else: | |
public_key_content = public_key_content[0] | |
public_key = rsa.PublicKey.load_pkcs1(public_key_content, format='DER') | |
h = hashlib.sha256() | |
h.update(content) | |
hashed_content = binascii.hexlify(h.digest()).decode() | |
signing_string = ('%s\n%s\n%s\n%s' % ( | |
end_time, | |
'%s/%s' % (cloudtrail_bucket, digest_obj), | |
hashed_content, | |
previous_digest_sig | |
)).encode() | |
try: | |
rsa.verify(signing_string, signature, public_key) | |
logger.info('Digest file in %s region successfully verified: %s' % (region, digest_obj)) | |
except rsa.pkcs1.VerificationError: | |
logger.critical('Signature validation failed in %s region for digest file: %s' % (region, digest_obj)) | |
sys.exit(2) #no point in verifying cloudtrail logs if the digest cannot be verified | |
for log_file in log_file_list: | |
resp = s3.get_object(Bucket=log_file['s3Bucket'], Key=log_file['s3Object']) | |
content = gzip.decompress(resp['Body'].read()) | |
h = hashlib.sha256() | |
h.update(content) | |
generated_hash = binascii.hexlify(h.digest()).decode() | |
if generated_hash != log_file['hashValue']: | |
logger.critical('Mismatched content hash found for cloudtrail log file: %s' % log_file['s3Object']) | |
else: | |
logger.info('CloudTrail log file successfully verified: %s' % log_file['s3Object']) | |
return previous_digest_obj, previous_digest_bucket | |
# this can be modified to however you want to push out notifications of failures | |
# e.g. sns topic, slack hook, new relic alarm | |
def notify_error(log_record): | |
sns = boto3.client('sns') | |
if log_record.levelno > logging.INFO: | |
sns.publish(TopicArn='arn:aws:sns:us-east-1:014890146463:test-topic', Message=log_record.getMessage()) | |
def lambda_handler(event, context): | |
logging_queue = queue.Queue() | |
handler = handlers.QueueHandler(logging_queue) | |
logger.addHandler(handler) | |
digest_bucket = 'local-cloudtrail-logs-014890146463' | |
account_id = context.invoked_function_arn.split(":")[4] | |
# currently, the only way to programmatiaclly query this value is via the ec2 or other service-specific apis | |
ec2 = boto3.client('ec2') | |
resp = ec2.describe_regions() | |
region_list = [r['RegionName'] for r in resp['Regions']] | |
pool = futures.ThreadPoolExecutor(max_workers=len(region_list)) | |
for region in region_list: | |
pool.submit(start_verification, account_id, digest_bucket, region) | |
pool.shutdown(wait=True) | |
while not logging_queue.empty(): | |
try: | |
log_record = logging_queue.get(timeout=0.1) | |
notify_error(log_record) | |
except queue.Empty: | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment