Last active
August 22, 2020 00:58
-
-
Save truekonrads/02fa28c131456b14ed9c1c74912ae62e to your computer and use it in GitHub Desktop.
Parse logrhythm unarchived log files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# To add a new cell, type '# %%' | |
# To add a new markdown cell, type '# %% [markdown]' | |
# %% | |
from datetime import datetime | |
from multiprocessing import Pool | |
from lxml import etree | |
from lxml.etree import XMLSyntaxError | |
import logging | |
import sys | |
import re | |
from xml.sax.saxutils import escape | |
import glob | |
from pathlib import Path, PurePath | |
import os | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) | |
LOGGER = logging.getLogger('lr_parser') | |
try: | |
import ujson as json | |
except ImportError: | |
LOGGER.warning("Unable to import ujson, this will be slow") | |
import json | |
# %% | |
def iterloglines(fp, marker): | |
accum = "" | |
for i, l in enumerate(fp.readlines()): | |
if l.startswith(marker) and i > 0: | |
yield accum | |
accum = l | |
else: | |
accum += l | |
yield accum | |
# %% | |
PREFIX_RE = re.compile("^([A-Za-z ]+):\s*$") | |
FIELD_RE = re.compile("^\t*([A-Za-z ]+)(?::\t{1,}|=)(.+)") | |
def description_to_fields(description): | |
event_dict = {} | |
prefix = '' | |
lines = description.split("\n") | |
first_field_match = None | |
last_field_match = None | |
for i, l in enumerate(lines): | |
##### | |
# WHY, oh Why? Well, Imagine the following record sample | |
### | |
# An account failed to log on. | |
# Subject: | |
# Security ID: S-1-5-21-3333333333-4444444444-5555555555-6666 | |
# Account Name: joebloggs | |
# Account Domain: DOMAIN | |
# Logon ID: 0x8be966a | |
# Logon Type: 2 | |
# Account For Which Logon Failed: | |
# Security ID: NULL SID | |
# Account Name: administrator | |
# Account Domain: SYSEM | |
### | |
# See that Security ID and Account Name are mentioned twice? So what we will do | |
# is we will prefix the first one with "Subject" and 2nd one with Account For Which.... | |
# | |
m = PREFIX_RE.match(l) | |
if m: # we've hit a prefix like "Subject:" | |
if first_field_match is None: | |
first_field_match = i | |
prefix = m.group(1) | |
continue | |
if prefix and l == '': # end of prefix | |
prefix = '' | |
continue | |
m = FIELD_RE.match(l) | |
if m: | |
if first_field_match is None: | |
first_field_match = i | |
last_field_match = i | |
(k, v) = m.groups() | |
if prefix: | |
new_key = prefix + " " + k | |
else: | |
new_key = k | |
if new_key in event_dict: | |
LOGGER.warn( | |
"Key {} already in dict with value: {} ({})".format( | |
new_key, event_dict[new_key])) | |
event_dict[new_key] = v | |
event_dict['Description'] = "" | |
if first_field_match and last_field_match: | |
if first_field_match > 0: | |
event_dict['Description'] += "\n".join(lines[:first_field_match]) | |
if last_field_match+1 < len(lines): | |
event_dict['Description'] += "\n".join( | |
lines[last_field_match+1:]).strip() | |
else: | |
event_dict['Description'] = description | |
return event_dict | |
# %% | |
def process_windows_record(e): | |
record = {} | |
system, eventdata = e.getchildren() | |
ns = {'event': 'http://schemas.microsoft.com/win/2004/08/events/event'} | |
system.find("event:Provider", ns).tag | |
for c in system.getchildren(): | |
if c.tag == '{http://schemas.microsoft.com/win/2004/08/events/event}TimeCreated': | |
record['TimeGenerated'] = c.attrib['SystemTime'] | |
else: | |
if c.text: | |
record[c.tag.split('}')[1]] = c.text | |
if eventdata.text and eventdata.text.strip(): # non empty string | |
expanded_contents = description_to_fields(eventdata.text) | |
record.update({k.replace(" ", "_"): v for k, | |
v in expanded_contents.items()}) | |
elif eventdata.getchildren(): | |
for c in eventdata.getchildren(): | |
if 'Name' in c.attrib: | |
record[c.attrib['Name']] = c.text | |
record['Description'] = "|".join( | |
[c.text for c in eventdata.getchildren() if c.text]) | |
return record | |
# %% | |
def patch_up_event(event: str) -> str: | |
beginning = "<EventData>" | |
ending = "</EventData>" | |
bi = event.find(beginning) | |
ei = event.find(ending) | |
before = event[:bi+11] | |
eventdata = event[bi+11:ei] # 11 - len("<EventData>") | |
trailer = event[ei:] | |
return before+escape(eventdata)+trailer | |
# %% | |
def process_logfile(logfile, destfile=None): | |
start_time = datetime.now() | |
errors=False | |
errorfile=None | |
marker = """<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'>""" | |
securitylog = open(logfile) | |
if destfile is None: | |
destfile = "{}.json".format(logfile) | |
LOGGER.info("Processing file: {} with output to: {}".format( | |
logfile, destfile)) | |
outputfp = open(destfile, "w") | |
for i, event in enumerate(iterloglines(securitylog, marker)): | |
try: | |
try: | |
xmlevent = etree.fromstring(event) | |
except XMLSyntaxError as e: | |
fixed_event = patch_up_event(event) | |
xmlevent = etree.fromstring(fixed_event) | |
LOGGER.debug("Unable to deal with {} on line {} due to {}".format(logfile,i,e)) | |
record = process_windows_record(xmlevent) | |
outputfp.write(json.dumps(record, ensure_ascii=True)+"\n") | |
except Exception as e: | |
LOGGER.error("Unable to process event #{} from {}, exception: {} ".format( | |
i, logfile, e | |
)) | |
LOGGER.debug("Offending event: {}".format(event)) | |
errors=True | |
if errorfile is None: | |
errorfile=open("{}.err".format(destfile),"w") | |
errorfile.write(event) | |
end_time = datetime.now() | |
LOGGER.info("Processing of {} took {} seconds with {} records".format( | |
logfile, | |
(end_time-start_time).total_seconds(), | |
i) | |
) | |
if errors: | |
LOGGER.warn("{} finished with errors!".format(logfile)) | |
def main(*logfiles: str, runners: int = 1, destdir=None, debug: bool = False): | |
"""Parse logrhythm unarchived log files | |
Args: | |
logfiles: glob pattern of logs to process | |
runners: number of concurrent processes | |
""" | |
if debug: | |
LOGGER.setLevel(logging.DEBUG) | |
LOGGER.debug("Debugging on") | |
all_logs = [item for sublist in [ | |
glob.glob(k) for k in logfiles] for item in sublist] | |
all_logs = sorted(set(all_logs)) | |
src_dst_log_pair = [] | |
if destdir: | |
for l in all_logs: | |
src_dst_log_pair.append( | |
(l, PurePath(destdir).joinpath(PurePath(l).name+".json"))) | |
else: | |
src_dst_log_pair = [(l, l+".json") for l in all_logs] | |
if runners > 0: | |
try: | |
p = Pool(runners) | |
p.starmap(process_logfile, src_dst_log_pair) | |
except KeyboardInterrupt: | |
p.terminate() | |
else: | |
LOGGER.info("Running in single thread mode") | |
for s, d in src_dst_log_pair: | |
process_logfile(s, d) | |
if __name__ == "__main__": | |
from fire import Fire | |
# Disable ANSI colours on windows | |
if os.name == 'nt': | |
os.environ['ANSI_COLORS_DISABLED'] = "1" | |
Fire(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment