Skip to content

Instantly share code, notes, and snippets.

@truekonrads
Last active August 22, 2020 00:58
Show Gist options
  • Save truekonrads/02fa28c131456b14ed9c1c74912ae62e to your computer and use it in GitHub Desktop.
Save truekonrads/02fa28c131456b14ed9c1c74912ae62e to your computer and use it in GitHub Desktop.
Parse logrhythm unarchived log files
#!/usr/bin/env python3
# To add a new cell, type '# %%'
# To add a new markdown cell, type '# %% [markdown]'
# %%
from datetime import datetime
from multiprocessing import Pool
from lxml import etree
from lxml.etree import XMLSyntaxError
import logging
import sys
import re
from xml.sax.saxutils import escape
import glob
from pathlib import Path, PurePath
import os
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
LOGGER = logging.getLogger('lr_parser')
try:
import ujson as json
except ImportError:
LOGGER.warning("Unable to import ujson, this will be slow")
import json
# %%
def iterloglines(fp, marker):
accum = ""
for i, l in enumerate(fp.readlines()):
if l.startswith(marker) and i > 0:
yield accum
accum = l
else:
accum += l
yield accum
# %%
PREFIX_RE = re.compile("^([A-Za-z ]+):\s*$")
FIELD_RE = re.compile("^\t*([A-Za-z ]+)(?::\t{1,}|=)(.+)")
def description_to_fields(description):
event_dict = {}
prefix = ''
lines = description.split("\n")
first_field_match = None
last_field_match = None
for i, l in enumerate(lines):
#####
# WHY, oh Why? Well, Imagine the following record sample
###
# An account failed to log on.
# Subject:
# Security ID: S-1-5-21-3333333333-4444444444-5555555555-6666
# Account Name: joebloggs
# Account Domain: DOMAIN
# Logon ID: 0x8be966a
# Logon Type: 2
# Account For Which Logon Failed:
# Security ID: NULL SID
# Account Name: administrator
# Account Domain: SYSEM
###
# See that Security ID and Account Name are mentioned twice? So what we will do
# is we will prefix the first one with "Subject" and 2nd one with Account For Which....
#
m = PREFIX_RE.match(l)
if m: # we've hit a prefix like "Subject:"
if first_field_match is None:
first_field_match = i
prefix = m.group(1)
continue
if prefix and l == '': # end of prefix
prefix = ''
continue
m = FIELD_RE.match(l)
if m:
if first_field_match is None:
first_field_match = i
last_field_match = i
(k, v) = m.groups()
if prefix:
new_key = prefix + " " + k
else:
new_key = k
if new_key in event_dict:
LOGGER.warn(
"Key {} already in dict with value: {} ({})".format(
new_key, event_dict[new_key]))
event_dict[new_key] = v
event_dict['Description'] = ""
if first_field_match and last_field_match:
if first_field_match > 0:
event_dict['Description'] += "\n".join(lines[:first_field_match])
if last_field_match+1 < len(lines):
event_dict['Description'] += "\n".join(
lines[last_field_match+1:]).strip()
else:
event_dict['Description'] = description
return event_dict
# %%
def process_windows_record(e):
record = {}
system, eventdata = e.getchildren()
ns = {'event': 'http://schemas.microsoft.com/win/2004/08/events/event'}
system.find("event:Provider", ns).tag
for c in system.getchildren():
if c.tag == '{http://schemas.microsoft.com/win/2004/08/events/event}TimeCreated':
record['TimeGenerated'] = c.attrib['SystemTime']
else:
if c.text:
record[c.tag.split('}')[1]] = c.text
if eventdata.text and eventdata.text.strip(): # non empty string
expanded_contents = description_to_fields(eventdata.text)
record.update({k.replace(" ", "_"): v for k,
v in expanded_contents.items()})
elif eventdata.getchildren():
for c in eventdata.getchildren():
if 'Name' in c.attrib:
record[c.attrib['Name']] = c.text
record['Description'] = "|".join(
[c.text for c in eventdata.getchildren() if c.text])
return record
# %%
def patch_up_event(event: str) -> str:
beginning = "<EventData>"
ending = "</EventData>"
bi = event.find(beginning)
ei = event.find(ending)
before = event[:bi+11]
eventdata = event[bi+11:ei] # 11 - len("<EventData>")
trailer = event[ei:]
return before+escape(eventdata)+trailer
# %%
def process_logfile(logfile, destfile=None):
start_time = datetime.now()
errors=False
errorfile=None
marker = """<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'>"""
securitylog = open(logfile)
if destfile is None:
destfile = "{}.json".format(logfile)
LOGGER.info("Processing file: {} with output to: {}".format(
logfile, destfile))
outputfp = open(destfile, "w")
for i, event in enumerate(iterloglines(securitylog, marker)):
try:
try:
xmlevent = etree.fromstring(event)
except XMLSyntaxError as e:
fixed_event = patch_up_event(event)
xmlevent = etree.fromstring(fixed_event)
LOGGER.debug("Unable to deal with {} on line {} due to {}".format(logfile,i,e))
record = process_windows_record(xmlevent)
outputfp.write(json.dumps(record, ensure_ascii=True)+"\n")
except Exception as e:
LOGGER.error("Unable to process event #{} from {}, exception: {} ".format(
i, logfile, e
))
LOGGER.debug("Offending event: {}".format(event))
errors=True
if errorfile is None:
errorfile=open("{}.err".format(destfile),"w")
errorfile.write(event)
end_time = datetime.now()
LOGGER.info("Processing of {} took {} seconds with {} records".format(
logfile,
(end_time-start_time).total_seconds(),
i)
)
if errors:
LOGGER.warn("{} finished with errors!".format(logfile))
def main(*logfiles: str, runners: int = 1, destdir=None, debug: bool = False):
"""Parse logrhythm unarchived log files
Args:
logfiles: glob pattern of logs to process
runners: number of concurrent processes
"""
if debug:
LOGGER.setLevel(logging.DEBUG)
LOGGER.debug("Debugging on")
all_logs = [item for sublist in [
glob.glob(k) for k in logfiles] for item in sublist]
all_logs = sorted(set(all_logs))
src_dst_log_pair = []
if destdir:
for l in all_logs:
src_dst_log_pair.append(
(l, PurePath(destdir).joinpath(PurePath(l).name+".json")))
else:
src_dst_log_pair = [(l, l+".json") for l in all_logs]
if runners > 0:
try:
p = Pool(runners)
p.starmap(process_logfile, src_dst_log_pair)
except KeyboardInterrupt:
p.terminate()
else:
LOGGER.info("Running in single thread mode")
for s, d in src_dst_log_pair:
process_logfile(s, d)
if __name__ == "__main__":
from fire import Fire
# Disable ANSI colours on windows
if os.name == 'nt':
os.environ['ANSI_COLORS_DISABLED'] = "1"
Fire(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment