Last active
February 5, 2016 21:18
-
-
Save gdestuynder/b22728572fdfa02d4f50 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python2 | |
# See http://elasticsearch-dsl.readthedocs.org/en/latest/search_dsl.html | |
from elasticsearch import Elasticsearch | |
from elasticsearch_dsl import Search, Q | |
from elasticsearch_dsl.connections import connections | |
from datetime import datetime | |
from datetime import timedelta | |
import pytz | |
import sys | |
import pprint | |
import json | |
def fatal(msg): | |
print(msg) | |
sys.exit(1) | |
def debug(msg): | |
sys.stderr.write('+++ {}\n'.format(msg)) | |
class DotDict(dict): | |
'''dict.item notation for dict()'s''' | |
__getattr__ = dict.__getitem__ | |
__setattr__ = dict.__setitem__ | |
__delattr__ = dict.__delitem__ | |
def __init__(self, dct): | |
for key, value in dct.items(): | |
if hasattr(value, 'keys'): | |
value = DotDict(value) | |
self[key] = value | |
def toUTC(suspectedDate, localTimeZone=None): | |
'''Anything => UTC date. Magic.''' | |
if (localTimeZone == None): | |
try: | |
localTimeZone = '/'.join(os.path.realpath('/etc/localtime').split('/')[-2:]) | |
except: | |
localTimeZone = 'UTC' | |
utc = pytz.UTC | |
objDate = None | |
if (type(suspectedDate) == str): | |
objDate = parse(suspectedDate, fuzzy=True) | |
elif (type(suspectedDate) == datetime): | |
objDate=suspectedDate | |
if (objDate.tzinfo is None): | |
try: | |
objDate=pytz.timezone(localTimeZone).localize(objDate) | |
except pytz.exceptions.UnknownTimeZoneError: | |
#Meh if all fails, I decide you're UTC! | |
objDate=pytz.timezone('UTC').localize(objDate) | |
objDate=utc.normalize(objDate) | |
else: | |
objDate=utc.normalize(objDate) | |
if (objDate is not None): | |
objDate=utc.normalize(objDate) | |
return objDate | |
def risk_highest(risk1, risk2): | |
table = {'Unknown': 0, | |
'LOW': 1, | |
'MEDIUM': 2, | |
'HIGH': 3, | |
'MAXIMUM': 4 | |
} | |
r1 = table[risk1] | |
r2 = table[risk2] | |
if r1 == r2: return risk1 | |
if r1 > r2: return risk1 | |
else: return risk2 | |
def risk_calculator(impact, probability): | |
#table[IMPACT][PROBABILITY] = RISK | |
#same as RRA 2.5.0 table | |
if impact == '': impact = 'Unknown' | |
if probability == '': probability = 'Unknown' | |
table = {'Unknown': {}, 'LOW':{}, 'MEDIUM': {}, 'HIGH': {}, 'MAXIMUM': {}} | |
table['Unknown'] = {'Unknown': 'Unknown', 'LOW': 'Unknown', 'MEDIUM': 'Unknown', 'HIGH': 'Unknown', 'MAXIMUM': 'Unknown'} | |
table['LOW'] = {'Unknown': 'LOW', 'LOW': 'LOW', 'MEDIUM': 'LOW', 'HIGH': 'MEDIUM', 'MAXIMUM': 'HIGH',} | |
table['MEDIUM'] = {'Unknown': 'MEDIUM', 'LOW': 'LOW', 'MEDIUM': 'MEDIUM', 'HIGH': 'HIGH', 'MAXIMUM': 'HIGH',} | |
table['HIGH'] = {'Unknown': 'HIGH', 'LOW': 'MEDIUM', 'MEDIUM': 'MEDIUM', 'HIGH': 'HIGH', 'MAXIMUM': 'MAXIMUM',} | |
table['MAXIMUM'] = {'Unknown': 'HIGH', 'LOW': 'MEDIUM', 'MEDIUM': 'HIGH', 'HIGH': 'MAXIMUM', 'MAXIMUM': 'MAXIMUM',} | |
return table[impact][probability] | |
def main(): | |
config = DotDict({'host': 'mozdefes2.private.scl3.mozilla.com', | |
'index': 'rra', | |
'minutes': 3000, | |
'size': 1000, | |
}) | |
es=connections.create_connection(hosts=[config.host], timeout=20) | |
debug(connections.get_connection().cluster.health()) | |
begindateUTC = toUTC(datetime.now() - timedelta(minutes=config.minutes)) | |
enddateUTC = toUTC(datetime.now()) | |
s = Search(using=es, index=config['index']) | |
s = s.extra(size=config.size) | |
s = s.filter('range', utctimestamp={'gte': begindateUTC, 'lte': enddateUTC}) | |
r = s.execute() | |
debug("Response received: {}, took {} seconds, {} hits, timed out: {}".format(r.success(), r.took, r.hits.total, | |
r.timed_out)) | |
if (r.hits.total <= 0): | |
fatal("No matches found.") | |
for h in r: | |
risk = h.details.risk | |
crisk='Unknown' | |
for r in risk: | |
for rtype in risk[r]: | |
crisk = risk_highest(risk_calculator(risk[r][rtype]['impact'], risk[r][rtype]['probability']), crisk) | |
print("{},{},{}".format(h.details.metadata.service.encode('ascii', 'ignore'), | |
h.details.metadata.owner.encode('ascii', 'ignore'), crisk)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment