Skip to content

Instantly share code, notes, and snippets.

@Slabity
Created November 30, 2018 22:00
Show Gist options
  • Select an option

  • Save Slabity/80c6156f6b8ac70871231fd2db4575fe to your computer and use it in GitHub Desktop.

Select an option

Save Slabity/80c6156f6b8ac70871231fd2db4575fe to your computer and use it in GitHub Desktop.
#! /usr/bin/env python
import datetime
import difflib
import gzip
# Mapping between files and their archived locations
# Must be formatted with a datestamp in the form of `YYmmdd`
ARCHIVE_CONF_FMTS = [
("flow.conf", "/archive/{0}/eod/flow-eod_{0}.conf.gz"),
("riskmgr.conf", "/archive/{0}/eod/riskmgr-eod_{0}.conf.gz"),
("risksettings.conf", "/archive/{0}/eod/risksettings-eod_{0}.conf.gz"),
]
# Returns true if the given date is a weekday (Mon-Fri)
def is_weekday(date):
return 0 <= date.weekday() <= 4
# Gets a file's path from the archive from a specific date
def get_archive_path(fname, date):
path = next(p for f, p in ARCHIVE_CONF_FMTS if f == fname)
return path.format(date.strftime("%Y%m%d"))
# Read a config from the archive, returns the list of lines
def read_archived_config(fname, date):
fpath = get_archive_path(fname, date)
fhandle = gzip.open(fpath)
flines = fhandle.readlines()
fhandle.close()
return flines
# Given a date, returns the date of the following weekday
def next_weekday(date):
weekday = date.weekday()
if weekday == 4: # Friday
return date + datetime.timedelta(days=3)
if weekday == 5: # Saturday
return date + datetime.timedelta(days=2)
return date + datetime.timedelta(days=1)
# Given a date, returns the date of the previous weekday
def prev_weekday(date):
weekday = date.weekday()
if weekday == 0: # Monday
return date - datetime.timedelta(days=3)
if weekday == 6: # Sunday
return date - datetime.timedelta(days=2)
return date - datetime.timedelta(days=1)
# Opens regular text or gzip file, returning None if failed
def open_config_file(fpath):
try:
if fpath.endswith(".gz"):
return gzip.open(fpath, 'r')
else:
return open(fpath, 'r')
except IOError:
return None
# Returns the set of flows from a config file
def read_flows(fpath):
fconf = open_config_file(fpath)
if fconf is None:
return []
flows = [line for line in fconf.readlines() if line.startswith('flow')]
fconf.close()
return flows
# Returns the path to the config file archived on that date
def get_config_path(fname, date):
if date == datetime.date.today():
return "/opt/hpr/config/{0}.conf".format(fname)
new_name = "{0}-eod_{1}.conf.gz".format(fname, date.strftime("%Y%m%d"))
return "/archive/{0}/eod/{1}".format(date.strftime("%Y%m%d"), new_name)
# Returns the flows from a given date, going forward if none are found
def open_next_config(fnames, date):
# Ensure the date used is a weekday
if not is_weekday(date):
date = next_weekday(date)
# Open the config files
fpaths = [get_config_path(fname, date) for fname in fnames]
all_flows = [read_flows(fpath) for fpath in fpaths]
filtered_flows = [flows for flows in all_flows if len(flows) != 0]
# Either it found no flows today, or found more than one file with flows
if len(filtered_flows) != 1:
return open_next_config(fnames, next_weekday(date))
return (date, filtered_flows[0])
# Returns the flows from a given date, going forward if none are found
def open_prev_config(fnames, date):
# Ensure the date used is a weekday
if not is_weekday(date):
date = prev_weekday(date)
# Open the config files
fpaths = [get_config_path(fname, date) for fname in fnames]
all_flows = [read_flows(fpath) for fpath in fpaths]
filtered_flows = [flows for flows in all_flows if len(flows) != 0]
# Either it found no flows today, or found more than one file with flows
if len(filtered_flows) != 1:
return open_prev_config(fnames, prev_weekday(date))
return (date, filtered_flows[0])
# Opens the files from a given date and a month prior
def open_both_configs(fnames, date):
pdate = date - datetime.timedelta(days=30)
if date == datetime.date.today():
# Not running manually; Iterate backwards
return (open_prev_config(fnames, date), open_prev_config(fnames, pdate))
else:
# Running manually; Iterate forwards
return (open_next_config(fnames, date), open_prev_config(fnames, pdate))
# Sanitize and format a flow command for presenting to client.
def format_flow(flow):
words = flow.split()[::-1]
sess_id = words[17]
sess_acc = words[16]
sess_sys = words[15]
sess_mic = words[12]
sess_ip = words[6]
sess_port = words[5]
sess_user = words[4]
sess_pass = words[0]
sess_socket = "{0}:{1}".format(sess_ip, sess_port)
sess_flow = "Session: {0}".format(sess_id)
sess_user = "Username: {0}".format(sess_user)
sess_comp = "CompID: {0}".format(sess_pass)
return (sess_id, "{0}\t{1}\t{2}\t{3}\t{4}\t{5}".format(sess_socket, sess_acc, sess_mic, sess_flow, sess_user, sess_comp))
def main(argv):
date = sanitize_args(argv)
conf_names = ["flow", "riskmgr"]
try:
((cur_date, cur_flows), (old_date, old_flows)) = open_both_configs(conf_names, date)
except RuntimeError as re:
print("Could not find files in archive")
sys.exit(-1)
except IOError as io:
print("Could not open files: {0}".format(io))
sys.exit(-1)
print("Comparing flows between dates {0} and {1}".format(cur_date, old_date))
print("\n")
print("Number of flows on {0}: {1}".format(cur_date, len(cur_flows)))
print("Number of flows on {0}: {1}".format(old_date, len(old_flows)))
print("\n")
d = difflib.Differ()
diff = list(d.compare(cur_flows, old_flows))
possibly_added = [format_flow(line) for line in diff if line.startswith('+')]
possibly_removed = [format_flow(line) for line in diff if line.startswith('-')]
possibly_added_ids = [x for (x, flow) in possibly_added]
possibly_removed_ids = [x for (x, flow) in possibly_removed]
actually_added = [flow for (sess_id, flow) in possibly_added if not sess_id in possibly_removed_ids]
actually_removed = [flow for (sess_id, flow) in possibly_removed if not sess_id in possibly_added_ids]
print("Flows removed: ({0})".format(len(actually_removed)))
print("-" * 48)
for line in actually_removed:
print(line)
# Separate by newline
print('\n')
print("Flows added: ({0})".format(len(actually_added)))
print("-" * 48)
for line in actually_added:
print(line)
# Sanitize the CLI arguments
def sanitize_args(argv):
# If no optional date is provided, use today
if len(argv) < 2:
date = datetime.date.today()
# But make sure it's a Tuesday
if date.weekday() != 1:
print("Cannot run script automatically on non-Tuesdays")
print("Supply a date to run manually")
print("")
print_usage_and_exit(argv[0])
else:
date = datetime.datetime.strptime(argv[1], '%Y%m%d').date()
return date
def print_usage_and_exit(pname):
print("Usage: {0} [YYYYMMDD]".format(pname))
sys.exit(-1)
if __name__ == '__main__':
import sys
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment