Skip to content

Instantly share code, notes, and snippets.

@gdestuynder
Last active February 5, 2016 21:18
Show Gist options
  • Save gdestuynder/b22728572fdfa02d4f50 to your computer and use it in GitHub Desktop.
Save gdestuynder/b22728572fdfa02d4f50 to your computer and use it in GitHub Desktop.
#!/usr/bin/python2
# See http://elasticsearch-dsl.readthedocs.org/en/latest/search_dsl.html
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q
from elasticsearch_dsl.connections import connections
from datetime import datetime
from datetime import timedelta
import pytz
import sys
import pprint
import json
def fatal(msg):
print(msg)
sys.exit(1)
def debug(msg):
sys.stderr.write('+++ {}\n'.format(msg))
class DotDict(dict):
'''dict.item notation for dict()'s'''
__getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
def __init__(self, dct):
for key, value in dct.items():
if hasattr(value, 'keys'):
value = DotDict(value)
self[key] = value
def toUTC(suspectedDate, localTimeZone=None):
'''Anything => UTC date. Magic.'''
if (localTimeZone == None):
try:
localTimeZone = '/'.join(os.path.realpath('/etc/localtime').split('/')[-2:])
except:
localTimeZone = 'UTC'
utc = pytz.UTC
objDate = None
if (type(suspectedDate) == str):
objDate = parse(suspectedDate, fuzzy=True)
elif (type(suspectedDate) == datetime):
objDate=suspectedDate
if (objDate.tzinfo is None):
try:
objDate=pytz.timezone(localTimeZone).localize(objDate)
except pytz.exceptions.UnknownTimeZoneError:
#Meh if all fails, I decide you're UTC!
objDate=pytz.timezone('UTC').localize(objDate)
objDate=utc.normalize(objDate)
else:
objDate=utc.normalize(objDate)
if (objDate is not None):
objDate=utc.normalize(objDate)
return objDate
def risk_highest(risk1, risk2):
table = {'Unknown': 0,
'LOW': 1,
'MEDIUM': 2,
'HIGH': 3,
'MAXIMUM': 4
}
r1 = table[risk1]
r2 = table[risk2]
if r1 == r2: return risk1
if r1 > r2: return risk1
else: return risk2
def risk_calculator(impact, probability):
#table[IMPACT][PROBABILITY] = RISK
#same as RRA 2.5.0 table
if impact == '': impact = 'Unknown'
if probability == '': probability = 'Unknown'
table = {'Unknown': {}, 'LOW':{}, 'MEDIUM': {}, 'HIGH': {}, 'MAXIMUM': {}}
table['Unknown'] = {'Unknown': 'Unknown', 'LOW': 'Unknown', 'MEDIUM': 'Unknown', 'HIGH': 'Unknown', 'MAXIMUM': 'Unknown'}
table['LOW'] = {'Unknown': 'LOW', 'LOW': 'LOW', 'MEDIUM': 'LOW', 'HIGH': 'MEDIUM', 'MAXIMUM': 'HIGH',}
table['MEDIUM'] = {'Unknown': 'MEDIUM', 'LOW': 'LOW', 'MEDIUM': 'MEDIUM', 'HIGH': 'HIGH', 'MAXIMUM': 'HIGH',}
table['HIGH'] = {'Unknown': 'HIGH', 'LOW': 'MEDIUM', 'MEDIUM': 'MEDIUM', 'HIGH': 'HIGH', 'MAXIMUM': 'MAXIMUM',}
table['MAXIMUM'] = {'Unknown': 'HIGH', 'LOW': 'MEDIUM', 'MEDIUM': 'HIGH', 'HIGH': 'MAXIMUM', 'MAXIMUM': 'MAXIMUM',}
return table[impact][probability]
def main():
config = DotDict({'host': 'mozdefes2.private.scl3.mozilla.com',
'index': 'rra',
'minutes': 3000,
'size': 1000,
})
es=connections.create_connection(hosts=[config.host], timeout=20)
debug(connections.get_connection().cluster.health())
begindateUTC = toUTC(datetime.now() - timedelta(minutes=config.minutes))
enddateUTC = toUTC(datetime.now())
s = Search(using=es, index=config['index'])
s = s.extra(size=config.size)
s = s.filter('range', utctimestamp={'gte': begindateUTC, 'lte': enddateUTC})
r = s.execute()
debug("Response received: {}, took {} seconds, {} hits, timed out: {}".format(r.success(), r.took, r.hits.total,
r.timed_out))
if (r.hits.total <= 0):
fatal("No matches found.")
for h in r:
risk = h.details.risk
crisk='Unknown'
for r in risk:
for rtype in risk[r]:
crisk = risk_highest(risk_calculator(risk[r][rtype]['impact'], risk[r][rtype]['probability']), crisk)
print("{},{},{}".format(h.details.metadata.service.encode('ascii', 'ignore'),
h.details.metadata.owner.encode('ascii', 'ignore'), crisk))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment