Skip to content

Instantly share code, notes, and snippets.

@stavxyz
Last active April 10, 2018 19:04
Show Gist options
  • Save stavxyz/3140b7f17430e13f3ec848066c1a61f4 to your computer and use it in GitHub Desktop.
Save stavxyz/3140b7f17430e13f3ec848066c1a61f4 to your computer and use it in GitHub Desktop.
stream logs from an ELB or an ALB (it determines this for you). pipe to `jq` to find 5xx's or whatever (requires python3)
#! /usr/bin/env python
"""
# this one has load balancer name
# and does not end in gz
bucket[/prefix]/AWSLogs/aws-account-id/elasticloadbalancing/region/yyyy/mm/dd/aws-account-id_elasticloadbalancing_region_load-balancer-name_end-time_ip-address_random-string.log
# this one has load balancer ID
# and ends in gz
bucket[/prefix]/AWSLogs/aws-account-id/elasticloadbalancing/region/yyyy/mm/dd/aws-account-id_elasticloadbalancing_region_load-balancer-id_end-time_ip-address_random-string.log.gz
A note about the LastModified timestamp on an s3 object, and how it
related to ELB logs.
If the LastModified timestamp is *prior* to the requested range,
we know there will be NO relevant objects in that data.
This necessarily means the event at the ELB occurred AFTER
the objects were written (or more specifically last modified).
"""
from __future__ import print_function
import datetime
import functools
import json
import re
import sys
import zlib
import boto3
import botocore.exceptions
LB_LOG_ENTRY_OBJECT_RE = re.compile(
r'(AWSLogs\/(?P<aws_account_number>\d{12})\/elasticloadbalancing\/'
'(?P<region>[\da-z-]{7,15})\/(?P<year>\d{4})\/(?P<month>0?[1-9]|1[012])\/'
'(?P<day>0[1-9]|[12]\d|3[01])\/.*\.log)'
)
ELB_PREFIX = ('AWSLogs/{aws_account_number}/elasticloadbalancing'
'/{region}/{year}/{month}/{day}')
ELB_ITEMS = [
'(?P<timestamp>([\.\:\-TZ\d]*))',
'(?P<elb>(\S*))',
'(?P<client_ip>([0-9.]+)):(?P<client_port>([0-9.]*))',
'(?P<target_ip>([0-9.]+)):(?P<target_port>([0-9.]*))',
'(?P<request_processing_time>([-.0-9]*))',
'(?P<target_processing_time>([-.0-9]*))',
'(?P<response_processing_time>([-.0-9]*))',
'(?P<alb_status_code>(-|[0-9]*))',
'(?P<target_status_code>(-|[0-9]*))',
'(?P<received_bytes>([-0-9]*))',
'(?P<sent_bytes>([-0-9]*))',
'"(?P<request>((.*?) (.*?)://(.*?):([0-9]+)([^? ]*)(\\x3f?.*?) (.*?)))"',
'"(?P<user_agent>(.*?))"',
'(?P<ssl_cipher>(\S*))',
'(?P<ssl_protocol>(\S*)?)'
]
ALB_ITEMS = [
# '(?P<type>\S+)',
'(?P<type>(\S*))',
# '(?P<timestamp>\S+)',
'(?P<timestamp>([\.\:\-TZ\d]*))',
# '(?P<elb>\S+)',
'(?P<alb>(\S*))',
# '(?P<client_colon_port>\S+)',
'(?P<client_ip>([0-9.]+)):(?P<client_port>([0-9.]*))',
'(?P<target_ip>([0-9.]+)):(?P<target_port>([0-9.]*))',
# '(?P<target_colon_port>\S+)',
# '(?P<request_processing_time>\S+)',
'(?P<request_processing_time>([-.0-9]*))',
# '(?P<target_processing_time>\S+)',
'(?P<target_processing_time>([-.0-9]*))',
# '(?P<response_processing_time>\S+)',
'(?P<response_processing_time>([-.0-9]*))',
# '(?P<elb_status_code>\d+)',
'(?P<alb_status_code>(-|[0-9]*))',
# '(?P<target_status_code>\d+)',
'(?P<target_status_code>(-|[0-9]*))',
# '(?P<received_bytes>\d+)',
'(?P<received_bytes>([-0-9]*))',
# '(?P<sent_bytes>\d+)',
'(?P<sent_bytes>([-0-9]*))',
# '"(?P<request>.+)"',
'"(?P<request>((.*?) (.*?)://(.*?):([0-9]+)([^? ]*)(\\x3f?.*?) (.*?)))"',
# '"(?P<user_agent>.+)"',
'"(?P<user_agent>(.*?))"',
# '(?P<ssl_cipher>\S+)',
'(?P<ssl_cipher>(\S*))',
# '(?P<ssl_protocol>\S+)',
'(?P<ssl_protocol>(.*?))',
# '(?P<target_group_arn>\S+)',
'(?P<target_group_arn>(\S*))',
# '(?P<trace_id>\S+)',
'"(?P<trace_id>(.*?))"',
'"?(?P<domain_name>(\S*)?)?"?',
'"?(?P<chosen_cert_arn>(\S*)?)?"?'
]
def iter_objects(bucket, **list_kw):
"""Yield objects from s3."""
print('calling iter_objects', file=sys.stderr)
s3_client = boto3.client('s3')
lkw = dict(Bucket=bucket, **list_kw)
def pages():
"""Yield pages of objects from s3."""
print('setting begin to True', file=sys.stderr)
begin = True
continuation_token = None
while continuation_token or begin:
print('continuation token: {}'.format(continuation_token), file=sys.stderr)
print('begin: {}'.format(begin), file=sys.stderr)
begin = False
print('Calling list objects with args: %s' % lkw, file=sys.stderr)
_page = s3_client.list_objects_v2(**lkw)
continuation_token = _page.get('NextContinuationToken')
if continuation_token:
lkw['ContinuationToken'] = continuation_token
else:
assert not _page['IsTruncated']
try:
print('Got {0} items'.format(_page['KeyCount']), file=sys.stderr)
yield _page['Contents']
except KeyError:
assert _page['KeyCount'] == 0
print('Page has no contents and a key count of 0.', file=sys.stderr)
for page in pages():
print('yielding items from page (listing objects)', file=sys.stderr)
yield from page
def __list(args):
elb_client = boto3.client('elb')
elbs = elb_client.describe_load_balancers()['LoadBalancerDescriptions']
elbs = [__x['LoadBalancerName']
for __x in elbs]
alb_client = boto3.client('elbv2')
albs = alb_client.describe_load_balancers()['LoadBalancers']
albs = [__y['LoadBalancerName']
for __y in albs]
print(json.dumps(
{'elbs': elbs, 'albs': albs},
sort_keys=True,
indent=2,
default=_default
))
def __logs(args):
elb_client = boto3.client('elb')
try:
elb = elb_client.describe_load_balancer_attributes(
LoadBalancerName=args.elb_name)
access_log = elb['LoadBalancerAttributes']['AccessLog']
except botocore.exceptions.ClientError as err:
skip = ('LoadBalancerNotFound', 'cannot be longer')
if not any(_x in str(err) for _x in skip):
raise
alb_client = boto3.client('elbv2')
alb = alb_client.describe_load_balancers(
Names=[args.elb_name])['LoadBalancers']
assert len(alb) == 1
alb = alb_client.describe_load_balancer_attributes(
LoadBalancerArn=alb[0]['LoadBalancerArn'])
alb = {_a['Key']: _a['Value'] for _a in alb['Attributes']}
access_log = {
'S3BucketPrefix': alb.get('access_logs.s3.prefix'),
'S3BucketName': alb.get('access_logs.s3.bucket'),
}
# TODO: learn to deal with this
assert not access_log['S3BucketPrefix']
# hack:
# Use the first matching elb log object to determine
# the aws account number and region
first = next(
o for o in iter_objects(access_log['S3BucketName'], MaxKeys=3)
if LB_LOG_ENTRY_OBJECT_RE.match(o['Key'])
)
print('Found first ELB log to use for params: %s' % first, file=sys.stderr)
parsed = LB_LOG_ENTRY_OBJECT_RE.match(first['Key']).groupdict()
list_kw = {
'MaxKeys': 5000,
'Prefix': ELB_PREFIX.format(
aws_account_number=parsed['aws_account_number'],
region=parsed['region'],
year=args.start.strftime('%Y'),
month=args.start.strftime('%m'),
day=args.start.strftime('%d'),
)
}
s3_client = boto3.client('s3')
objects = iter_objects(
access_log['S3BucketName'],
**list_kw
)
plenty_of_time_to_ingest = args.end + datetime.timedelta(hours=1)
go = True
seen = set()
days_forward = 0
while go:
for o in objects:
_ho = hash(o['Key'])
if _ho in seen:
go = False
continue
else:
print('havent seen {} yet'.format(_ho), file=sys.stderr)
seen.add(_ho)
sys.stderr.flush()
match = LB_LOG_ENTRY_OBJECT_RE.match(o['Key'])
if not match:
# This should never happen.
print('\n\n\nREGEX MATCH FAILED', file=sys.stderr)
print('--->', o, file=sys.stderr)
print('\n\n\n ##############################', file=sys.stderr)
continue
match = match.groupdict()
last_modified = o['LastModified']
# hack
plenty_of_time_to_ingest = args.end + datetime.timedelta(hours=2)
way_too_far = args.end + datetime.timedelta(days=1)
#then = (
# datetime.datetime.now(last_modified.tzinfo) -
# datetime.timedelta(seconds=args.ago)
#)
if last_modified < args.start:
# The object was last modified before our desired range.
# It is *not possible* for there to be relevant logs in this object.
sys.stderr.write('\n -- not possible -- \n')
continue
#elif last_modified > way_too_far:
# # the elb logs object was last modified 24 hours or more
# # after the range we care about.
# sys.stderr.write('p')
# go = False
# continue
elif last_modified > plenty_of_time_to_ingest:
# The elb logs object was modified two hours or more after
# our range, there *PROBABLY* aren't logs in here we care about
sys.stderr.write('\n------ p ----------\n')
entries = _get_elb_log_entries_from_object(
s3_client, access_log['S3BucketName'], o['Key'])
_found = False
for log_time, entry in entries:
if (log_time >= args.start) and (log_time <= args.end):
_found = True
go = True
sys.stderr.write('\n------ wow ----------\n')
sys.stderr.flush()
print(json.dumps(entry, indent=2, sort_keys=True))
elif log_time < args.start:
# somewhat surprising
continue
elif log_time > args.end:
# surprising
sys.stderr.write('!')
continue
else:
sys.stderr.write('*')
sys.stderr.flush()
if not _found:
# No ELB log items in this object have a timestamp value in range
sys.stderr.write('\n------ PPPPP ----------\n')
sys.stderr.write('didnt find a single entry in that object')
sys.stderr.flush()
# go = False
continue
go = False
continue
elif (last_modified >= args.start) and (last_modified <= args.end):
# very likely to be logs we care about here
sys.stderr.write('\n------ v ----------\n')
entries = _get_elb_log_entries_from_object(
s3_client, access_log['S3BucketName'], o['Key'])
_found = False
for log_time, entry in entries:
if (log_time >= args.start) and (log_time <= args.end):
_found = True
go = True
print(json.dumps(entry, indent=2, sort_keys=True))
elif log_time < args.start:
sys.stderr.write('log timestamp is unexpectedly low')
sys.stderr.flush()
# somewhat surprising
continue
elif log_time > args.end:
sys.stderr.write('log timestamp is unexpectedly high')
sys.stderr.flush()
# surprising
sys.stderr.write('!')
continue
else:
sys.stderr.write('actually wat')
sys.stderr.write('*')
sys.stderr.flush()
if not _found:
# No ELB log items in this object have a timestamp value in range
sys.stderr.write('\n------ VVVVVV ----------\n')
sys.stderr.write('didnt find a single entry in that object')
sys.stderr.flush()
# go = False
continue
else:
days_forward += 1
next_day = args.start + datetime.timedelta(days=days_forward)
_list_kw = {
'MaxKeys': 1000,
'Prefix': ELB_PREFIX.format(
aws_account_number=parsed['aws_account_number'],
region=parsed['region'],
year=next_day.strftime('%Y'),
month=next_day.strftime('%m'),
day=next_day.strftime('%d'),
)
}
if go:
# yet more hacky checks
_list_kw_copy_for_first = _list_kw.copy()
_list_kw_copy_for_first['MaxKeys'] = 3
try:
first = next(
o for o in iter_objects(access_log['S3BucketName'], **_list_kw)
if LB_LOG_ENTRY_OBJECT_RE.match(o['Key'])
)
except StopIteration:
first = None
if not first:
print('nope, nothing here', file=sys.stderr)
# we're done here, I think................
go = False
continue
else:
objects = iter_objects(
access_log['S3BucketName'],
**_list_kw
)
else:
print('nope', file=sys.stderr)
def _get_elb_log_entries_from_object(s3, bucket, key):
"""Yield (time, entry) tuples."""
obj = s3.get_object(Key=key, Bucket=bucket)
obj_body = obj['Body'].read()
alb = False
if key.lower().endswith('.gz'):
alb = True
obj_body = zlib.decompress(obj_body, 16 + zlib.MAX_WBITS)
for entry in obj_body.splitlines():
entry = _decode_entry(entry, alb=alb)
yield parse_datetime(entry['timestamp']), entry
import datetime
import re
import botocore.compat
import dateutil.parser
import dateutil.tz
_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=dateutil.tz.tzutc())
# Stolen from jorgebastida/awslogs
def parse_datetime(datetime_text):
"""Parse ``datetime_text`` into a ``datetime``."""
ago_regexp = r'(\d+)\s?(m|minute|minutes|h|hour|hours|d|day|days|w|weeks|weeks)(?: ago)?'
ago_match = re.match(ago_regexp, datetime_text)
if ago_match:
amount, unit = ago_match.groups()
amount = int(amount)
unit = {'m': 60, 'h': 3600, 'd': 86400, 'w': 604800}[unit[0]]
date = (datetime.datetime.utcnow() +
datetime.timedelta(seconds=unit * amount * -1))
else:
try:
date = dateutil.parser.parse(datetime_text)
except ValueError:
raise ValueError('dateutil didnt understand {0}'.format(datetime_text))
if not date.tzinfo:
offset = date.utcoffset()
if offset is None or offset.seconds == 0:
date = date.replace(tzinfo=dateutil.tz.tzutc())
#return _millis_since_epoch(date)
return date
def _decode_entry(entry, alb=False):
entry = entry.decode('utf-8')
components = ALB_ITEMS if alb else ELB_ITEMS
for i in reversed(range(len(components))):
line = re.compile(r'({0})'.format(' '.join(components[0:i+1])))
item = line.search(entry)
if not item:
continue
item = item.groupdict()
break
else:
print('Could not parse log line {0}'.format(entry), file=sys.stderr)
item = {}
return item
class __tzutc(datetime.tzinfo):
ZERO = datetime.timedelta(0)
def utcoffset(self, dt):
return self.ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return self.ZERO
_tzutc = __tzutc()
def _default(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, datetime.datetime):
# always return UTC dates
return obj.astimezone(_tzutc).isoformat()
return obj
def dispatch(args):
return args._func(args)
if __name__ == '__main__':
import argparse
p = argparse.ArgumentParser()
p.set_defaults(_func=p.print_help, write=sys.stderr.write)
sp = p.add_subparsers()
lp = sp.add_parser('logs')
lp.add_argument('elb_name')
lp.add_argument('--start', '-s', default='10m ago',
type=parse_datetime,
help='Only match logs at or beyond this time')
lp.add_argument('--end', '-e',
default='0m ago',
type=parse_datetime,
help='Only match logs before this specified end time')
lp.add_argument('--quiet', '-q', action='store_true')
lp.set_defaults(_func=__logs)
ls = sp.add_parser('list')
ls.set_defaults(_func=__list)
args = p.parse_args()
try:
dispatch(args)
except KeyboardInterrupt:
sys.exit()
@stavxyz
Copy link
Author

stavxyz commented Feb 1, 2018

Example, find 5xx's:

./awslblogs.py logs --start='2h ago' waypoint-server-alb | jq 'select(((.elb_status_code|tostring|startswith("5")) or (.target_status_code|tostring|startswith("5"))))' | jq -s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment