Last active
October 9, 2019 00:03
-
-
Save gyoza/fc5a9b17cf3de48fa2b879f627b2ce2a to your computer and use it in GitHub Desktop.
paginate ec2 with ability to filter by tag name and value
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import re | |
import sys | |
import time | |
import math | |
import json | |
import boto | |
import boto3 | |
import random | |
import inspect | |
import argparse | |
import botocore | |
import traceback | |
import base64 | |
from x256 import x256 | |
from boto3 import client | |
from pprint import pformat | |
from os.path import getmtime | |
from datetime import datetime | |
from pygments import highlight | |
from random import randint, choice | |
from botocore.exceptions import ClientError | |
from pygments.lexers import JsonLexer, PythonLexer, TextLexer | |
from pygments.formatters import TerminalFormatter, Terminal256Formatter | |
def jprint(**kwargs): | |
""" easy json.dumps indent=4 printer cause im lazyyyy """ | |
passed_name = list(kwargs.items())[0][0] | |
passed_data = kwargs[passed_name] | |
ptype = str(type(passed_data)) | |
print_header = "+---- [{} - {}] --\n".format(passed_name, ptype) | |
print_footer = "+------------------" | |
print(print_header) | |
found = False | |
if isinstance(passed_data, dict): | |
pre_color = json.dumps(passed_data, indent=4, default=convert_or_pass) | |
print(highlight(pre_color, JsonLexer(), Terminal256Formatter())) | |
found = True | |
elif isinstance(passed_data, list): | |
pre_color = json.dumps(passed_data, indent=4, default=convert_or_pass) | |
print(highlight(pre_color, JsonLexer(), Terminal256Formatter())) | |
found = True | |
elif isinstance(passed_data, str): | |
print(highlight(passed_data, TextLexer(), Terminal256Formatter())) | |
found = True | |
elif isinstance(passed_data, int): | |
print(passed_data) | |
found = True | |
if inspect.getmembers(passed_data, inspect.isclass) and not found: | |
print(highlight(pformat(passed_data), PythonLexer(), Terminal256Formatter())) | |
print(str(passed_data)) | |
print(print_footer) | |
_color_map = { | |
"hblue": "\x1b[34;1m", | |
"hgreen": "\x1b[32;1m", | |
"hcyan": "\x1b[36;1m", | |
"hmagenta": "\x1b[35;1m", | |
} | |
class PrettyLogger(object): | |
""" - | |
use PrettyLogger(log_level="warn", msg="H|sup yo colorcode|highlight colorcode|me colorcode|MULTIPLE TIMES!") | |
""" | |
_log_level_map = { | |
"error" : ["red", "\x1b[31m"], | |
"warn" : ["yellow", "\x1b[33m"], | |
"info" : ["green", "\x1b[32m"], | |
"verbose" : ["blue", "\x1b[34;1m"], | |
"other" : ["cyan", "\x1b[36m"], | |
"invalid" : ["red", "\x1b[36m"], | |
"creset" : ["creset", "\x1b[0m"], | |
"debug" : ["magenta", "\x1b[35m"], | |
} | |
_color_map = { | |
"red": "\x1b[31m", | |
"yellow": "\x1b[33m", | |
"green": "\x1b[32m", | |
"blue": "\x1b[34m", | |
"cyan": "\x1b[36m", | |
"magenta": "\x1b[35m", | |
"hred": "\x1b[31;1m", | |
"hyellow": "\x1b[33;1m", | |
"hblue": "\x1b[34;1m", | |
"hgreen": "\x1b[32;1m", | |
"hcyan": "\x1b[36;1m", | |
"hmagenta": "\x1b[35;1m", | |
"creset": "\x1b[0m", | |
} | |
def highlight(self, msg): | |
if msg.startswith("H|"): | |
reset_color = self._color_map["creset"] | |
msg = msg.partition("|")[2] | |
matches = re.findall(r'\((.*?)\)', msg) | |
for each in matches: | |
original_msg = f"({each})" | |
msg_split = each.split('|') | |
color_code = msg_split[0] | |
words = msg_split[1] | |
if color_code not in self._color_map: | |
highlight_color = self._color_map["yellow"] | |
else: | |
highlight_color = self._color_map[color_code] | |
words = f'{highlight_color}{words}{reset_color}' | |
msg = msg.replace(original_msg, words) | |
return msg | |
def fix_header(self): | |
return "[\033[92m{} - {}\x1b[0m]".format(datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), self.header) | |
def __init__(self, **kwargs): | |
log_level = kwargs["log_level"] | |
if log_level not in self._log_level_map: | |
log_level = "invalid" | |
msg = kwargs["msg"] | |
self.log_level_color = self._log_level_map[log_level][1] | |
self.log_level_color_reset = self._log_level_map["creset"][1] | |
self.log_level = ":{}{:>7s}{}:".format(self.log_level_color, log_level, self.log_level_color_reset) | |
called = str(list(sys._current_frames().items())[0][0]) | |
self.header = [v[1] for k, v in enumerate(inspect.stack()) if called not in str(v[0])][1].split('/')[-1] | |
self.header = self.fix_header() | |
msg = self.highlight(msg) | |
print("{} {} {}".format(self.header, self.log_level, msg)) | |
class super_except(Exception): | |
""" Super Exception Class """ | |
def __init__(self, *args, **kwargs): | |
self.original_exception = kwargs.pop('original_exception', None) | |
if self.original_exception: | |
exc_type, exc_value, exc_traceback = sys.exc_info() | |
self.traceback_text = '\n'.join(traceback.format_tb(exc_traceback)) | |
Exception.__init__(self, *args) | |
@property | |
def name(self): | |
return self.__class__.__name__ | |
def __repr__(self): | |
output = super(super_except, self).__repr__() | |
output += '\n' | |
if self.original_exception: | |
output += '------------------------------------------------------------\nORIGINAL EXCEPTION:\n' | |
output += '{}\n'.format(self.traceback_text) | |
output += repr(self.original_exception) | |
return output | |
def __str__(self): | |
return repr(self) | |
class SmartFormatter(argparse.HelpFormatter): | |
def _split_lines(self, text, width): | |
if text.startswith('R|'): | |
return text[2:].splitlines() | |
# this is the RawTextHelpFormatter._split_lines | |
return argparse.HelpFormatter._split_lines(self, text, width) | |
def convert_or_pass(obj): | |
""" Allows storing of crappy AWS datestamp """ | |
if hasattr(obj, 'isoformat'): | |
return obj.isoformat() | |
else: | |
json.JSONEncoder.default(self, obj) | |
def chunkify(input, n): | |
return [input[i::n] for i in range(n)] | |
def config_parser(): | |
""" argparse """ | |
parser = argparse.ArgumentParser(description="hit ec2 for results", formatter_class=SmartFormatter, epilog="example: ec2list --tag environment --value staging --region us-east-1 --profile imeet2-sysops") | |
parser.add_argument("--profile", help="R|Desired AWS profile\n\nexample: default (default)\n someotheraccount", default="imeet2-sysops") | |
parser.add_argument("--region", help="R|Desired AWS region\n\nexample: us-east-1 (us-east-1)\n eu-central-1", default="us-east-1") | |
parser.add_argument("--all", required=False, action='store_true', help="R|Get everything in region specified.") | |
parser.add_argument("--inverse", required=False, action='store_true', help="R|Show everything not with this tag.") | |
parser.add_argument("--showtags", required=False, action='store_true', help="R|Show tags for system in output.") | |
parser.add_argument("--tag", required=False, help="R|Look for this Tag, (case sensitive)\n\nexample: Name or Environment") | |
parser.add_argument("--value", required=False, help="R|Look for this value in tag values, (case sensitive)\n\nexample: olympus, Production, or Staging") | |
parser.add_argument("--clear", required=False, action='store_true', help="R|clear cache") | |
return parser | |
class AWSdo(object): | |
""" READS MANIFEST PASSED FROM INPUT HANDLER: | |
""" | |
def __init__(self, **kwargs): | |
self.manifest = kwargs['manifest'] | |
self.region = self.manifest["region"] | |
self.profile = self.manifest["profile"] | |
self.tag_name = self.manifest["tag_name"] | |
self.tag_value = self.manifest["tag_value"] | |
self.all = self.manifest["all"] | |
self.inverse = self.manifest["inverse"] | |
self._service_map = { | |
"ec2": { | |
"call": "describe_instances", | |
"filter" : "Reservations[].Instances[?Tags[]|[?starts_with(Key, '{}')]|[?contains(Value, '{}')]]".format(self.tag_name, self.tag_value), | |
"allfilter" : "Reservations[].Instances[?Tags[]|[?starts_with(Key, '{}')]|[?contains(Value, '{}')]]".format("", ""), | |
"inversefilter" : "Reservations[].Instances[?Tags[]|[?starts_with(Key, '{}')]|[?!not_null(Value, '{}')]]".format("", "") | |
}, | |
} | |
self.service = self.manifest["service"] | |
self.service_call = self._service_map[self.service]["call"] | |
if self.all: | |
self.filter = self._service_map[self.service]["allfilter"] | |
elif self.inverse: | |
self.filter = self._service_map[self.service]["inversefilter"] | |
else: | |
self.filter = self._service_map[self.service]["filter"] | |
# print(self.filter) | |
bytes_filter = '{}{}'.format(self.profile, self.filter).encode() | |
bytes_plus = '{}{}'.format(bytes_filter, self.region).encode() | |
# filter_plus = "{}{}".format(bytes_filter, bytes_plus) | |
fn = base64.urlsafe_b64encode(bytes_plus) | |
fn = fn.decode() | |
fn = fn.replace("=", '').replace(".", '') | |
chunks, chunk_size = len(fn), len(fn) // 6 | |
self.dir_path = "/".join([fn[i:i + chunk_size] for i in range(0, chunks, chunk_size)][:-1]) | |
self.file_name = "{}.json".format([fn[i:i + chunk_size] for i in range(0, chunks, chunk_size)][-1]) | |
self.dir_path = f"/tmp/ec2list/{self.dir_path}" | |
self.full_path = f"{self.dir_path}/{self.file_name}" | |
def touch(self): | |
with open(self.full_path, 'a'): | |
os.utime(self.full_path, None) | |
def check_cache(self): | |
# print(f"checking cache file {self.full_path}") | |
if not os.path.isfile(self.full_path): | |
# print("no cache found") | |
# os.makedirs(file_path, exist_ok=True) | |
# self.touch(full_path) | |
return False | |
elif os.path.isfile(self.full_path): | |
# check time | |
# print("file found checking age.") | |
modified_time = os.path.getmtime(self.full_path) | |
current_time = time.time() | |
time_since = current_time - modified_time | |
if time_since > 86400: | |
# print("file older than 60 minutes, getting new cache.") | |
os.remove(self.full_path) | |
return False | |
elif args.clear: | |
os.remove(self.full_path) | |
return False | |
else: | |
return True | |
def write_cache(self, json_data): | |
os.makedirs(self.dir_path, exist_ok=True) | |
self.touch() | |
f = open(self.full_path, 'w') | |
f.write(json.dumps(json_data, default=convert_or_pass)) | |
f.close() | |
def get_paginate(self, session=None): | |
client = session.client(self.service) | |
paginator = client.get_paginator(self.service_call) | |
page_iterator = paginator.paginate( | |
PaginationConfig={'PageSize': 100} | |
) | |
results = page_iterator.search(self.filter) | |
return results | |
def paginate_aws(self, service=None, filter_req=None): | |
found_objects = [] | |
try: | |
session = boto3.Session(region_name=self.region, profile_name=self.profile) | |
check_cache = self.check_cache() | |
if not check_cache: | |
collect = list(self.get_paginate(session=session)) | |
self.write_cache(collect) | |
elif check_cache: | |
collect = json.loads(open(self.full_path).read()) | |
for each_obj in collect: | |
found_objects.append(each_obj) | |
if len(found_objects) == 0: | |
PrettyLogger(log_level="error", msg="H|(hred|No service returned) - (yellow|Correct Criteria? Try using the another profile.)") | |
quit() | |
except Exception as e: | |
PrettyLogger(log_level="error", msg="H|(hred|Oh No! Something went wrong paginating aws!)", flip=True) | |
raise super_except(inspect.currentframe().f_code.co_name, original_exception=e) | |
return found_objects | |
def get_ec2_for_tags(self): | |
if self.inverse: | |
PrettyLogger(log_level="info", msg=f'H|getting all EC2 systems that match are (hred|MISSING) tag name: (hgreen|{self.tag_name}) value: (hblue|{self.tag_value}) region: (hred|{self.region})') | |
else: | |
PrettyLogger(log_level="info", msg=f'H|getting all EC2 systems that match tag name: (hgreen|{self.tag_name}) value: (hblue|{self.tag_value}) region: (hred|{self.region})') | |
results = self.paginate_aws() | |
return results | |
if __name__ == '__main__': | |
parser = config_parser() | |
args = parser.parse_args() | |
manifest = { | |
"region" : args.region if args.region else None, | |
"profile": args.profile if args.profile else None, | |
"all": args.all if args.all else None, | |
"inverse": args.inverse if args.inverse else None, | |
"tag_value": args.value if args.value else None, | |
"tag_name": args.tag if args.tag else None, | |
"showtags": args.showtags if args.showtags else None, | |
"service": "ec2" | |
} | |
# debug manifest options here | |
# print(manifest) | |
a = manifest.get("tag_value") | |
b = manifest.get("tag_name") | |
c = manifest.get("all") | |
if ((b and c) or (a and c)): | |
PrettyLogger(log_level="error", msg="H|(hred|Syntax Error): you must choose either --all, or --tag with --value or --all on its own.") | |
quit() | |
results = AWSdo(manifest=manifest).get_ec2_for_tags() | |
last_color = None | |
print("--------------------------------------------------------------------------------") | |
count = 0 | |
for each_result in results: | |
if len(each_result) > 0: | |
count += 1 | |
instance = each_result[0] | |
if instance.get("Tags"): | |
tags = {d['Key']: d['Value'] for d in instance.get("Tags")} | |
instanceid = instance.get("InstanceId") | |
ip = instance.get("PrivateIpAddress") if instance.get("PrivateIpAddress") else "None Found" | |
pub_ip = instance.get("PublicIpAddress") if instance.get("PublicIpAddress") else "None Found" | |
state = instance.get("State").get("Name") if instance.get("State").get("Name") else "None Found" | |
instance_type = instance.get("InstanceType") if instance.get("InstanceType") else "None Found" | |
launch_time = instance.get("LaunchTime") if instance.get("LaunchTime") else "None Found" | |
name = tags.get("Name", "NoName") | |
name = name[:33] + (name[33:] and '..') | |
ix = x256.from_rgb(randint(randint(36, 175), 255), randint(randint(36, 175), 255), randint(randint(36, 175), 255)) | |
rand_color = f"\x1b[38;5;{ix}m" | |
creset = "\x1b[0m" | |
if rand_color == last_color: | |
ix = x256.from_rgb(randint(randint(36, 175), 255), randint(randint(36, 175), 255), randint(randint(36, 175), 255)) | |
rand_color = f"\x1b[38;5;{ix}m" | |
last_color = rand_color | |
print(f'{rand_color}{count:4} {args.region:15} - {name:35} - {instance_type:14} {instanceid:20} - {ip:15}/ {pub_ip:15} {state:10} - {launch_time:20}{creset}') | |
col_1 = [] | |
col_2 = [] | |
if manifest.get("showtags"): | |
for i, (k, v) in enumerate(tags.items()): | |
k = (k[:17] + '..') if len(k) > 17 else k | |
v = (v[:20] + '..') if len(v) > 20 else v | |
string = f'{k:20} : {v:30}' | |
if i % 2 == 0: | |
col_1.append(string) | |
else: | |
col_2.append(string) | |
for col_1, col_2 in zip(col_1, col_2): | |
print(f'\t{col_1:30}\t\t {col_2:30}') | |
print("--------------------------------------------------------------------------------") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment