Last active
April 10, 2018 19:04
-
-
Save stavxyz/3140b7f17430e13f3ec848066c1a61f4 to your computer and use it in GitHub Desktop.
stream logs from an ELB or an ALB (it determines this for you). pipe to `jq` to find 5xx's or whatever (requires python3)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
""" | |
# this one has load balancer name | |
# and does not end in gz | |
bucket[/prefix]/AWSLogs/aws-account-id/elasticloadbalancing/region/yyyy/mm/dd/aws-account-id_elasticloadbalancing_region_load-balancer-name_end-time_ip-address_random-string.log | |
# this one has load balancer ID | |
# and ends in gz | |
bucket[/prefix]/AWSLogs/aws-account-id/elasticloadbalancing/region/yyyy/mm/dd/aws-account-id_elasticloadbalancing_region_load-balancer-id_end-time_ip-address_random-string.log.gz | |
A note about the LastModified timestamp on an s3 object, and how it | |
related to ELB logs. | |
If the LastModified timestamp is *prior* to the requested range, | |
we know there will be NO relevant objects in that data. | |
This necessarily means the event at the ELB occurred AFTER | |
the objects were written (or more specifically last modified). | |
""" | |
from __future__ import print_function | |
import datetime | |
import functools | |
import json | |
import re | |
import sys | |
import zlib | |
import boto3 | |
import botocore.exceptions | |
LB_LOG_ENTRY_OBJECT_RE = re.compile( | |
r'(AWSLogs\/(?P<aws_account_number>\d{12})\/elasticloadbalancing\/' | |
'(?P<region>[\da-z-]{7,15})\/(?P<year>\d{4})\/(?P<month>0?[1-9]|1[012])\/' | |
'(?P<day>0[1-9]|[12]\d|3[01])\/.*\.log)' | |
) | |
ELB_PREFIX = ('AWSLogs/{aws_account_number}/elasticloadbalancing' | |
'/{region}/{year}/{month}/{day}') | |
ELB_ITEMS = [ | |
'(?P<timestamp>([\.\:\-TZ\d]*))', | |
'(?P<elb>(\S*))', | |
'(?P<client_ip>([0-9.]+)):(?P<client_port>([0-9.]*))', | |
'(?P<target_ip>([0-9.]+)):(?P<target_port>([0-9.]*))', | |
'(?P<request_processing_time>([-.0-9]*))', | |
'(?P<target_processing_time>([-.0-9]*))', | |
'(?P<response_processing_time>([-.0-9]*))', | |
'(?P<alb_status_code>(-|[0-9]*))', | |
'(?P<target_status_code>(-|[0-9]*))', | |
'(?P<received_bytes>([-0-9]*))', | |
'(?P<sent_bytes>([-0-9]*))', | |
'"(?P<request>((.*?) (.*?)://(.*?):([0-9]+)([^? ]*)(\\x3f?.*?) (.*?)))"', | |
'"(?P<user_agent>(.*?))"', | |
'(?P<ssl_cipher>(\S*))', | |
'(?P<ssl_protocol>(\S*)?)' | |
] | |
ALB_ITEMS = [ | |
# '(?P<type>\S+)', | |
'(?P<type>(\S*))', | |
# '(?P<timestamp>\S+)', | |
'(?P<timestamp>([\.\:\-TZ\d]*))', | |
# '(?P<elb>\S+)', | |
'(?P<alb>(\S*))', | |
# '(?P<client_colon_port>\S+)', | |
'(?P<client_ip>([0-9.]+)):(?P<client_port>([0-9.]*))', | |
'(?P<target_ip>([0-9.]+)):(?P<target_port>([0-9.]*))', | |
# '(?P<target_colon_port>\S+)', | |
# '(?P<request_processing_time>\S+)', | |
'(?P<request_processing_time>([-.0-9]*))', | |
# '(?P<target_processing_time>\S+)', | |
'(?P<target_processing_time>([-.0-9]*))', | |
# '(?P<response_processing_time>\S+)', | |
'(?P<response_processing_time>([-.0-9]*))', | |
# '(?P<elb_status_code>\d+)', | |
'(?P<alb_status_code>(-|[0-9]*))', | |
# '(?P<target_status_code>\d+)', | |
'(?P<target_status_code>(-|[0-9]*))', | |
# '(?P<received_bytes>\d+)', | |
'(?P<received_bytes>([-0-9]*))', | |
# '(?P<sent_bytes>\d+)', | |
'(?P<sent_bytes>([-0-9]*))', | |
# '"(?P<request>.+)"', | |
'"(?P<request>((.*?) (.*?)://(.*?):([0-9]+)([^? ]*)(\\x3f?.*?) (.*?)))"', | |
# '"(?P<user_agent>.+)"', | |
'"(?P<user_agent>(.*?))"', | |
# '(?P<ssl_cipher>\S+)', | |
'(?P<ssl_cipher>(\S*))', | |
# '(?P<ssl_protocol>\S+)', | |
'(?P<ssl_protocol>(.*?))', | |
# '(?P<target_group_arn>\S+)', | |
'(?P<target_group_arn>(\S*))', | |
# '(?P<trace_id>\S+)', | |
'"(?P<trace_id>(.*?))"', | |
'"?(?P<domain_name>(\S*)?)?"?', | |
'"?(?P<chosen_cert_arn>(\S*)?)?"?' | |
] | |
def iter_objects(bucket, **list_kw): | |
"""Yield objects from s3.""" | |
print('calling iter_objects', file=sys.stderr) | |
s3_client = boto3.client('s3') | |
lkw = dict(Bucket=bucket, **list_kw) | |
def pages(): | |
"""Yield pages of objects from s3.""" | |
print('setting begin to True', file=sys.stderr) | |
begin = True | |
continuation_token = None | |
while continuation_token or begin: | |
print('continuation token: {}'.format(continuation_token), file=sys.stderr) | |
print('begin: {}'.format(begin), file=sys.stderr) | |
begin = False | |
print('Calling list objects with args: %s' % lkw, file=sys.stderr) | |
_page = s3_client.list_objects_v2(**lkw) | |
continuation_token = _page.get('NextContinuationToken') | |
if continuation_token: | |
lkw['ContinuationToken'] = continuation_token | |
else: | |
assert not _page['IsTruncated'] | |
try: | |
print('Got {0} items'.format(_page['KeyCount']), file=sys.stderr) | |
yield _page['Contents'] | |
except KeyError: | |
assert _page['KeyCount'] == 0 | |
print('Page has no contents and a key count of 0.', file=sys.stderr) | |
for page in pages(): | |
print('yielding items from page (listing objects)', file=sys.stderr) | |
yield from page | |
def __list(args): | |
elb_client = boto3.client('elb') | |
elbs = elb_client.describe_load_balancers()['LoadBalancerDescriptions'] | |
elbs = [__x['LoadBalancerName'] | |
for __x in elbs] | |
alb_client = boto3.client('elbv2') | |
albs = alb_client.describe_load_balancers()['LoadBalancers'] | |
albs = [__y['LoadBalancerName'] | |
for __y in albs] | |
print(json.dumps( | |
{'elbs': elbs, 'albs': albs}, | |
sort_keys=True, | |
indent=2, | |
default=_default | |
)) | |
def __logs(args): | |
elb_client = boto3.client('elb') | |
try: | |
elb = elb_client.describe_load_balancer_attributes( | |
LoadBalancerName=args.elb_name) | |
access_log = elb['LoadBalancerAttributes']['AccessLog'] | |
except botocore.exceptions.ClientError as err: | |
skip = ('LoadBalancerNotFound', 'cannot be longer') | |
if not any(_x in str(err) for _x in skip): | |
raise | |
alb_client = boto3.client('elbv2') | |
alb = alb_client.describe_load_balancers( | |
Names=[args.elb_name])['LoadBalancers'] | |
assert len(alb) == 1 | |
alb = alb_client.describe_load_balancer_attributes( | |
LoadBalancerArn=alb[0]['LoadBalancerArn']) | |
alb = {_a['Key']: _a['Value'] for _a in alb['Attributes']} | |
access_log = { | |
'S3BucketPrefix': alb.get('access_logs.s3.prefix'), | |
'S3BucketName': alb.get('access_logs.s3.bucket'), | |
} | |
# TODO: learn to deal with this | |
assert not access_log['S3BucketPrefix'] | |
# hack: | |
# Use the first matching elb log object to determine | |
# the aws account number and region | |
first = next( | |
o for o in iter_objects(access_log['S3BucketName'], MaxKeys=3) | |
if LB_LOG_ENTRY_OBJECT_RE.match(o['Key']) | |
) | |
print('Found first ELB log to use for params: %s' % first, file=sys.stderr) | |
parsed = LB_LOG_ENTRY_OBJECT_RE.match(first['Key']).groupdict() | |
list_kw = { | |
'MaxKeys': 5000, | |
'Prefix': ELB_PREFIX.format( | |
aws_account_number=parsed['aws_account_number'], | |
region=parsed['region'], | |
year=args.start.strftime('%Y'), | |
month=args.start.strftime('%m'), | |
day=args.start.strftime('%d'), | |
) | |
} | |
s3_client = boto3.client('s3') | |
objects = iter_objects( | |
access_log['S3BucketName'], | |
**list_kw | |
) | |
plenty_of_time_to_ingest = args.end + datetime.timedelta(hours=1) | |
go = True | |
seen = set() | |
days_forward = 0 | |
while go: | |
for o in objects: | |
_ho = hash(o['Key']) | |
if _ho in seen: | |
go = False | |
continue | |
else: | |
print('havent seen {} yet'.format(_ho), file=sys.stderr) | |
seen.add(_ho) | |
sys.stderr.flush() | |
match = LB_LOG_ENTRY_OBJECT_RE.match(o['Key']) | |
if not match: | |
# This should never happen. | |
print('\n\n\nREGEX MATCH FAILED', file=sys.stderr) | |
print('--->', o, file=sys.stderr) | |
print('\n\n\n ##############################', file=sys.stderr) | |
continue | |
match = match.groupdict() | |
last_modified = o['LastModified'] | |
# hack | |
plenty_of_time_to_ingest = args.end + datetime.timedelta(hours=2) | |
way_too_far = args.end + datetime.timedelta(days=1) | |
#then = ( | |
# datetime.datetime.now(last_modified.tzinfo) - | |
# datetime.timedelta(seconds=args.ago) | |
#) | |
if last_modified < args.start: | |
# The object was last modified before our desired range. | |
# It is *not possible* for there to be relevant logs in this object. | |
sys.stderr.write('\n -- not possible -- \n') | |
continue | |
#elif last_modified > way_too_far: | |
# # the elb logs object was last modified 24 hours or more | |
# # after the range we care about. | |
# sys.stderr.write('p') | |
# go = False | |
# continue | |
elif last_modified > plenty_of_time_to_ingest: | |
# The elb logs object was modified two hours or more after | |
# our range, there *PROBABLY* aren't logs in here we care about | |
sys.stderr.write('\n------ p ----------\n') | |
entries = _get_elb_log_entries_from_object( | |
s3_client, access_log['S3BucketName'], o['Key']) | |
_found = False | |
for log_time, entry in entries: | |
if (log_time >= args.start) and (log_time <= args.end): | |
_found = True | |
go = True | |
sys.stderr.write('\n------ wow ----------\n') | |
sys.stderr.flush() | |
print(json.dumps(entry, indent=2, sort_keys=True)) | |
elif log_time < args.start: | |
# somewhat surprising | |
continue | |
elif log_time > args.end: | |
# surprising | |
sys.stderr.write('!') | |
continue | |
else: | |
sys.stderr.write('*') | |
sys.stderr.flush() | |
if not _found: | |
# No ELB log items in this object have a timestamp value in range | |
sys.stderr.write('\n------ PPPPP ----------\n') | |
sys.stderr.write('didnt find a single entry in that object') | |
sys.stderr.flush() | |
# go = False | |
continue | |
go = False | |
continue | |
elif (last_modified >= args.start) and (last_modified <= args.end): | |
# very likely to be logs we care about here | |
sys.stderr.write('\n------ v ----------\n') | |
entries = _get_elb_log_entries_from_object( | |
s3_client, access_log['S3BucketName'], o['Key']) | |
_found = False | |
for log_time, entry in entries: | |
if (log_time >= args.start) and (log_time <= args.end): | |
_found = True | |
go = True | |
print(json.dumps(entry, indent=2, sort_keys=True)) | |
elif log_time < args.start: | |
sys.stderr.write('log timestamp is unexpectedly low') | |
sys.stderr.flush() | |
# somewhat surprising | |
continue | |
elif log_time > args.end: | |
sys.stderr.write('log timestamp is unexpectedly high') | |
sys.stderr.flush() | |
# surprising | |
sys.stderr.write('!') | |
continue | |
else: | |
sys.stderr.write('actually wat') | |
sys.stderr.write('*') | |
sys.stderr.flush() | |
if not _found: | |
# No ELB log items in this object have a timestamp value in range | |
sys.stderr.write('\n------ VVVVVV ----------\n') | |
sys.stderr.write('didnt find a single entry in that object') | |
sys.stderr.flush() | |
# go = False | |
continue | |
else: | |
days_forward += 1 | |
next_day = args.start + datetime.timedelta(days=days_forward) | |
_list_kw = { | |
'MaxKeys': 1000, | |
'Prefix': ELB_PREFIX.format( | |
aws_account_number=parsed['aws_account_number'], | |
region=parsed['region'], | |
year=next_day.strftime('%Y'), | |
month=next_day.strftime('%m'), | |
day=next_day.strftime('%d'), | |
) | |
} | |
if go: | |
# yet more hacky checks | |
_list_kw_copy_for_first = _list_kw.copy() | |
_list_kw_copy_for_first['MaxKeys'] = 3 | |
try: | |
first = next( | |
o for o in iter_objects(access_log['S3BucketName'], **_list_kw) | |
if LB_LOG_ENTRY_OBJECT_RE.match(o['Key']) | |
) | |
except StopIteration: | |
first = None | |
if not first: | |
print('nope, nothing here', file=sys.stderr) | |
# we're done here, I think................ | |
go = False | |
continue | |
else: | |
objects = iter_objects( | |
access_log['S3BucketName'], | |
**_list_kw | |
) | |
else: | |
print('nope', file=sys.stderr) | |
def _get_elb_log_entries_from_object(s3, bucket, key): | |
"""Yield (time, entry) tuples.""" | |
obj = s3.get_object(Key=key, Bucket=bucket) | |
obj_body = obj['Body'].read() | |
alb = False | |
if key.lower().endswith('.gz'): | |
alb = True | |
obj_body = zlib.decompress(obj_body, 16 + zlib.MAX_WBITS) | |
for entry in obj_body.splitlines(): | |
entry = _decode_entry(entry, alb=alb) | |
yield parse_datetime(entry['timestamp']), entry | |
import datetime | |
import re | |
import botocore.compat | |
import dateutil.parser | |
import dateutil.tz | |
_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=dateutil.tz.tzutc()) | |
# Stolen from jorgebastida/awslogs | |
def parse_datetime(datetime_text): | |
"""Parse ``datetime_text`` into a ``datetime``.""" | |
ago_regexp = r'(\d+)\s?(m|minute|minutes|h|hour|hours|d|day|days|w|weeks|weeks)(?: ago)?' | |
ago_match = re.match(ago_regexp, datetime_text) | |
if ago_match: | |
amount, unit = ago_match.groups() | |
amount = int(amount) | |
unit = {'m': 60, 'h': 3600, 'd': 86400, 'w': 604800}[unit[0]] | |
date = (datetime.datetime.utcnow() + | |
datetime.timedelta(seconds=unit * amount * -1)) | |
else: | |
try: | |
date = dateutil.parser.parse(datetime_text) | |
except ValueError: | |
raise ValueError('dateutil didnt understand {0}'.format(datetime_text)) | |
if not date.tzinfo: | |
offset = date.utcoffset() | |
if offset is None or offset.seconds == 0: | |
date = date.replace(tzinfo=dateutil.tz.tzutc()) | |
#return _millis_since_epoch(date) | |
return date | |
def _decode_entry(entry, alb=False): | |
entry = entry.decode('utf-8') | |
components = ALB_ITEMS if alb else ELB_ITEMS | |
for i in reversed(range(len(components))): | |
line = re.compile(r'({0})'.format(' '.join(components[0:i+1]))) | |
item = line.search(entry) | |
if not item: | |
continue | |
item = item.groupdict() | |
break | |
else: | |
print('Could not parse log line {0}'.format(entry), file=sys.stderr) | |
item = {} | |
return item | |
class __tzutc(datetime.tzinfo): | |
ZERO = datetime.timedelta(0) | |
def utcoffset(self, dt): | |
return self.ZERO | |
def tzname(self, dt): | |
return "UTC" | |
def dst(self, dt): | |
return self.ZERO | |
_tzutc = __tzutc() | |
def _default(obj): | |
"""JSON serializer for objects not serializable by default json code""" | |
if isinstance(obj, datetime.datetime): | |
# always return UTC dates | |
return obj.astimezone(_tzutc).isoformat() | |
return obj | |
def dispatch(args): | |
return args._func(args) | |
if __name__ == '__main__': | |
import argparse | |
p = argparse.ArgumentParser() | |
p.set_defaults(_func=p.print_help, write=sys.stderr.write) | |
sp = p.add_subparsers() | |
lp = sp.add_parser('logs') | |
lp.add_argument('elb_name') | |
lp.add_argument('--start', '-s', default='10m ago', | |
type=parse_datetime, | |
help='Only match logs at or beyond this time') | |
lp.add_argument('--end', '-e', | |
default='0m ago', | |
type=parse_datetime, | |
help='Only match logs before this specified end time') | |
lp.add_argument('--quiet', '-q', action='store_true') | |
lp.set_defaults(_func=__logs) | |
ls = sp.add_parser('list') | |
ls.set_defaults(_func=__list) | |
args = p.parse_args() | |
try: | |
dispatch(args) | |
except KeyboardInterrupt: | |
sys.exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example, find 5xx's: