Skip to content

Instantly share code, notes, and snippets.

@haydenflinner
Created October 15, 2022 15:55
Show Gist options
  • Save haydenflinner/7062619e4638df061490e8b5275e115a to your computer and use it in GitHub Desktop.
Save haydenflinner/7062619e4638df061490e8b5275e115a to your computer and use it in GitHub Desktop.
A cmd-line application that automatically translates text logs to parsed CSVs. Open your logs in visidata!
#!/usr/bin/env python
# Original source here, see that repo for other algos and links to papers.
# https://github.com/logpai/logparser
"""
Example usage of this cmdline program:
python drain.py mylogfile.log | vd -f csv
Hints:
Use , key (with cursor in template column) to select all rows like this one.
Then press " to open a new temporary page with only those rows,
or gd to delete them.
Press q to quit.
See visidata docs for more, on e.g. aggregation, histograms, more advanced querying.
Description : This file implements the Drain algorithm for log parsing
Author : LogPAI team
License : MIT
"""
import re
import os
import numpy as np
import pandas as pd
import hashlib
from datetime import datetime
import sys
def log(s):
# Messes up vd output if always do it.
if not sys.stdout.isatty() and sys.stderr.isatty():
return
print(s, file=sys.stderr)
class Logcluster:
def __init__(self, logTemplate='', logIDL=None):
self.logTemplate = logTemplate
if logIDL is None:
logIDL = []
self.logIDL = logIDL
class Node:
def __init__(self, childD=None, depth=0, digitOrtoken=None):
if childD is None:
childD = dict()
self.childD = childD
self.depth = depth
self.digitOrtoken = digitOrtoken
class LogParser:
def __init__(self, log_format, depth=4, similarity_threshold=0.4,
maxChild=100, regexes=[], keep_para=True):
"""
Attributes
----------
rex : regular expressions used in preprocessing (step1)
path : the input path stores the input log file name
depth : depth of all leaf nodes
st : similarity threshold
maxChild : max number of children of an internal node
"""
self.depth = depth - 2
self.st = similarity_threshold
self.maxChild = maxChild
self.df_log = None
self.log_format = log_format
self.rex = regexes
self.keep_para = keep_para
def hasNumbers(self, s):
return any(char.isdigit() for char in s)
def treeSearch(self, rn, seq):
retLogClust = None
seqLen = len(seq)
if seqLen not in rn.childD:
return retLogClust
parentn = rn.childD[seqLen]
currentDepth = 1
for token in seq:
if currentDepth >= self.depth or currentDepth > seqLen:
break
if token in parentn.childD:
parentn = parentn.childD[token]
elif '<*>' in parentn.childD:
parentn = parentn.childD['<*>']
else:
return retLogClust
currentDepth += 1
logClustL = parentn.childD
retLogClust = self.fastMatch(logClustL, seq)
return retLogClust
def addSeqToPrefixTree(self, rn, logClust):
seqLen = len(logClust.logTemplate)
if seqLen not in rn.childD:
firtLayerNode = Node(depth=1, digitOrtoken=seqLen)
rn.childD[seqLen] = firtLayerNode
else:
firtLayerNode = rn.childD[seqLen]
parentn = firtLayerNode
currentDepth = 1
for token in logClust.logTemplate:
#Add current log cluster to the leaf node
if currentDepth >= self.depth or currentDepth > seqLen:
if len(parentn.childD) == 0:
parentn.childD = [logClust]
else:
parentn.childD.append(logClust)
break
#If token not matched in this layer of existing tree.
if token not in parentn.childD:
if not self.hasNumbers(token):
if '<*>' in parentn.childD:
if len(parentn.childD) < self.maxChild:
newNode = Node(depth=currentDepth + 1, digitOrtoken=token)
parentn.childD[token] = newNode
parentn = newNode
else:
parentn = parentn.childD['<*>']
else:
if len(parentn.childD)+1 < self.maxChild:
newNode = Node(depth=currentDepth+1, digitOrtoken=token)
parentn.childD[token] = newNode
parentn = newNode
elif len(parentn.childD)+1 == self.maxChild:
newNode = Node(depth=currentDepth+1, digitOrtoken='<*>')
parentn.childD['<*>'] = newNode
parentn = newNode
else:
parentn = parentn.childD['<*>']
else:
if '<*>' not in parentn.childD:
newNode = Node(depth=currentDepth+1, digitOrtoken='<*>')
parentn.childD['<*>'] = newNode
parentn = newNode
else:
parentn = parentn.childD['<*>']
#If the token is matched
else:
parentn = parentn.childD[token]
currentDepth += 1
#seq1 is template
def seqDist(self, seq1, seq2):
assert len(seq1) == len(seq2)
simTokens = 0
numOfPar = 0
for token1, token2 in zip(seq1, seq2):
if token1 == '<*>':
numOfPar += 1
continue
if token1 == token2:
simTokens += 1
retVal = float(simTokens) / len(seq1)
return retVal, numOfPar
def fastMatch(self, logClustL, seq):
retLogClust = None
maxSim = -1
maxNumOfPara = -1
maxClust = None
for logClust in logClustL:
curSim, curNumOfPara = self.seqDist(logClust.logTemplate, seq)
if curSim>maxSim or (curSim==maxSim and curNumOfPara>maxNumOfPara):
maxSim = curSim
maxNumOfPara = curNumOfPara
maxClust = logClust
if maxSim >= self.st:
retLogClust = maxClust
return retLogClust
def getTemplate(self, seq1, seq2):
assert len(seq1) == len(seq2)
retVal = []
i = 0
for word in seq1:
if word == seq2[i]:
retVal.append(word)
else:
retVal.append('<*>')
i += 1
return retVal
def outputResult(self, logClustL):
log_templates = [0] * self.df_log.shape[0]
log_templateids = [0] * self.df_log.shape[0]
df_events = []
for logClust in logClustL:
template_str = ' '.join(logClust.logTemplate)
occurrence = len(logClust.logIDL)
template_id = hashlib.md5(template_str.encode('utf-8')).hexdigest()[0:8]
for logID in logClust.logIDL:
logID -= 1
log_templates[logID] = template_str
log_templateids[logID] = template_id
df_events.append([template_id, template_str, occurrence])
df_event = pd.DataFrame(df_events, columns=['EventId', 'EventTemplate', 'Occurrences'])
self.df_log['EventId'] = log_templateids
self.df_log['EventTemplate'] = log_templates
if self.keep_para:
self.df_log["ParameterList"] = self.df_log.apply(self.get_parameter_list, axis=1)
occ_dict = dict(self.df_log['EventTemplate'].value_counts())
df_event = pd.DataFrame()
df_event['EventTemplate'] = self.df_log['EventTemplate'].unique()
df_event['EventId'] = df_event['EventTemplate'].map(lambda x: hashlib.md5(x.encode('utf-8')).hexdigest()[0:8])
df_event['Occurrences'] = df_event['EventTemplate'].map(occ_dict)
# df_event.columns = ["EventTemplate", "EventTemplate", "Occurrences"]
return self.df_log, df_event
def logTree(self, node, dep):
pStr = ''
for i in range(dep):
pStr += '\t'
if node.depth == 0:
pStr += 'Root'
elif node.depth == 1:
pStr += '<' + str(node.digitOrtoken) + '>'
else:
pStr += node.digitOrtoken
log(pStr)
if node.depth == self.depth:
return 1
for child in node.childD:
self.logTree(node.childD[child], dep+1)
def parse(self, inpath):
log('Parsing file: ' + os.path.join(inpath))
self.inpath = inpath
start_time = datetime.now()
rootNode = Node()
logCluL = []
self.load_data()
count = 0
for idx, line in self.df_log.iterrows():
logID = line['LineId']
logmessageL = self.preprocess(line['content']).strip().split()
# logmessageL = filter(lambda x: x != '', re.split('[\s=:,]', self.preprocess(line['content'])))
matchCluster = self.treeSearch(rootNode, logmessageL)
#Match no existing log cluster
if matchCluster is None:
newCluster = Logcluster(logTemplate=logmessageL, logIDL=[logID])
logCluL.append(newCluster)
self.addSeqToPrefixTree(rootNode, newCluster)
#Add the new log message to the existing cluster
else:
newTemplate = self.getTemplate(logmessageL, matchCluster.logTemplate)
matchCluster.logIDL.append(logID)
if ' '.join(newTemplate) != ' '.join(matchCluster.logTemplate):
matchCluster.logTemplate = newTemplate
count += 1
if count % 1000 == 0 or count == len(self.df_log):
log('Processed {0:.1f}% of log lines.'.format(count * 100.0 / len(self.df_log)))
returning = self.outputResult(logCluL)
log('Parsing done. [Time taken: {!s}]'.format(datetime.now() - start_time))
return returning
def load_data(self):
headers, regex = self.generate_logformat_regex(self.log_format)
self.df_log = self.log_to_dataframe(self.inpath, regex, headers)
def preprocess(self, line):
for currentRex in self.rex:
line = re.sub(currentRex, '<*>', line)
return line
def log_to_dataframe(self, log_file, regex, headers):
""" Function to transform log file to dataframe
"""
log_messages = []
linecount = 0
try:
content_idx = next(i for i, colname in enumerate(headers) if colname == 'content')
except:
raise ValueError("Need at least one group named <content>, typically the last entry in the line. Received: {!r}".format(regex))
with open(log_file, 'r') as fin:
for line in fin.readlines():
try:
match = regex.search(line.strip())
if not match:
if not log_messages:
log("Unable to parse first line with splitting-regex! Continuing...")
log_messages.append([None for _ in headers])
log_messages[-1][content_idx] = line.strip()
linecount += 1
continue
# This is a line which doesn't match the regex. It probably
# belongs with the previous line if the regex is well-constructed.
# Let's cram it into 'content' tag.
log_messages[-1][content_idx] += line.strip()
continue
message = [match.group(header) for header in headers]
log_messages.append(message)
linecount += 1
except Exception as e:
# pass was in the code from github, i change it to raise because it doesn't happen for me :shrug:
raise
logdf = pd.DataFrame(log_messages, columns=headers)
logdf.insert(0, 'LineId', None)
logdf['LineId'] = [i + 1 for i in range(linecount)]
return logdf
def generate_logformat_regex(self, logformat):
""" Function to generate regular expression to split log messages
"""
headers = []
splitters = re.split(r'(<[^<>]+>)', logformat)
regex = ''
for k in range(len(splitters)):
# Even keys will be splitters, odd keys <xyz> labels
if k % 2 == 0:
# Replace ' ' with any amount of whitespace
splitter = re.sub(' +', '\\\s+', splitters[k])
regex += splitter
else:
# Get the name of the <xyz> label -> xyz
# Create a capturing group for that. better than writing capturing regex directly, much more readable.
# Question here, seems like .+? might be better, but it works so i'm leaving it.
header = splitters[k].strip('<').strip('>')
regex += '(?P<%s>.*?)' % header
headers.append(header)
# If this regex doesn't match the line, we can't parse it.
regex = re.compile('^' + regex + '$')
log("Constructed splitting-regex from config: {!r}".format(regex))
return headers, regex
def get_parameter_list(self, row):
template_regex = re.sub(r"<.{1,5}>", "<*>", row["EventTemplate"])
if "<*>" not in template_regex: return []
template_regex = re.sub(r'([^A-Za-z0-9])', r'\\\1', template_regex)
template_regex = re.sub(r'\\ +', r'\\s+', template_regex)
template_regex = "^" + template_regex.replace("\<\*\>", "(.*?)") + "$"
parameter_list = re.findall(template_regex, row["content"])
parameter_list = parameter_list[0] if parameter_list else ()
parameter_list = list(parameter_list) if isinstance(parameter_list, tuple) else [parameter_list]
return parameter_list
### Above we inlined drain.py. Here is main.py inlined:
example_settings = {
'HDFS': {
'log_file': 'HDFS/HDFS_2k.log',
'log_format': '<Date> <Time> <Pid> <Level> <Component>: <content>',
'regex': [r'blk_-?\d+', r'(\d+\.){3}\d+(:\d+)?'],
'st': 0.5,
'depth': 4
},
'Hadoop': {
'log_file': 'Hadoop/Hadoop_2k.log',
'log_format': '<Date> <Time> <Level> \[<Process>\] <Component>: <content>',
'regex': [r'(\d+\.){3}\d+'],
'st': 0.5,
'depth': 4
},
'Spark': {
'log_file': 'Spark/Spark_2k.log',
'log_format': '<Date> <Time> <Level> <Component>: <content>',
'regex': [r'(\d+\.){3}\d+', r'\b[KGTM]?B\b', r'([\w-]+\.){2,}[\w-]+'],
'st': 0.5,
'depth': 4
},
'Zookeeper': {
'log_file': 'Zookeeper/Zookeeper_2k.log',
'log_format': '<Date> <Time> - <Level> \[<Node>:<Component>@<Id>\] - <content>',
'regex': [r'(/|)(\d+\.){3}\d+(:\d+)?'],
'st': 0.5,
'depth': 4
},
'BGL': {
'log_file': 'BGL/BGL_2k.log',
'log_format': '<Label> <Timestamp> <Date> <Node> <Time> <NodeRepeat> <Type> <Component> <Level> <content>',
'regex': [r'core\.\d+'],
'st': 0.5,
'depth': 4
},
'HPC': {
'log_file': 'HPC/HPC_2k.log',
'log_format': '<LogId> <Node> <Component> <State> <Time> <Flag> <content>',
'regex': [r'=\d+'],
'st': 0.5,
'depth': 4
},
'Thunderbird': {
'log_file': 'Thunderbird/Thunderbird_2k.log',
'log_format': '<Label> <Timestamp> <Date> <User> <Month> <Day> <Time> <Location> <Component>(\[<PID>\])?: <content>',
'regex': [r'(\d+\.){3}\d+'],
'st': 0.5,
'depth': 4
},
'Windows': {
'log_file': 'Windows/Windows_2k.log',
'log_format': '<Date> <Time>, <Level> <Component> <content>',
'regex': [r'0x.*?\s'],
'st': 0.7,
'depth': 5
},
'Linux': {
'log_file': 'Linux/Linux_2k.log',
'log_format': '<Month> <Date> <Time> <Level> <Component>(\[<PID>\])?: <content>',
'regex': [r'(\d+\.){3}\d+', r'\d{2}:\d{2}:\d{2}'],
'st': 0.39,
'depth': 6
},
'Andriod': {
'log_file': 'Andriod/Andriod_2k.log',
'log_format': '<Date> <Time> <Pid> <Tid> <Level> <Component>: <content>',
'regex': [r'(/[\w-]+)+', r'([\w-]+\.){2,}[\w-]+', r'\b(\-?\+?\d+)\b|\b0[Xx][a-fA-F\d]+\b|\b[a-fA-F\d]{4,}\b'],
'st': 0.2,
'depth': 6
},
'HealthApp': {
'log_file': 'HealthApp/HealthApp_2k.log',
'log_format': '<Time>\|<Component>\|<Pid>\|<content>',
'regex': [],
'st': 0.2,
'depth': 4
},
'Apache': {
'log_file': 'Apache/Apache_2k.log',
'log_format': '\[<Time>\] \[<Level>\] <content>',
'regex': [r'(\d+\.){3}\d+'],
'st': 0.5,
'depth': 4
},
'Proxifier': {
'log_file': 'Proxifier/Proxifier_2k.log',
'log_format': '\[<Time>\] <Program> - <content>',
'regex': [r'<\d+\ssec', r'([\w-]+\.)+[\w-]+(:\d+)?', r'\d{2}:\d{2}(:\d{2})*', r'[KGTM]B'],
'st': 0.6,
'depth': 3
},
'OpenSSH': {
'log_file': 'OpenSSH/OpenSSH_2k.log',
'log_format': '<Date> <Day> <Time> <Component> sshd\[<Pid>\]: <content>',
'regex': [r'(\d+\.){3}\d+', r'([\w-]+\.){2,}[\w-]+'],
'st': 0.6,
'depth': 5
},
'OpenStack': {
'log_file': 'OpenStack/OpenStack_2k.log',
'log_format': '<Logrecord> <Date> <Time> <Pid> <Level> <Component> \[<ADDR>\] <content>',
'regex': [r'((\d+\.){3}\d+,?)+', r'/.+?\s', r'\d+'],
'st': 0.5,
'depth': 5
},
'Mac': {
'log_file': 'Mac/Mac_2k.log',
'log_format': '<Month> <Date> <Time> <User> <Component>\[<PID>\]( \(<Address>\))?: <content>',
'regex': [r'([\w-]+\.){2,}[\w-]+'],
'st': 0.7,
'depth': 6
},
}
import sys
import os
import pandas as pd
import argparse
from argparse import RawTextHelpFormatter
parser = argparse.ArgumentParser(description='Process a log to CSV using the Drain algorithm.', formatter_class=RawTextHelpFormatter)
parser.add_argument('--similarity-threshold', '--st', type=float, nargs='?', default=0.4,
help='[0, 1] threshold, where a lower number makes it more likely for unlike lines to be grouped together.')
REGEX_DEFAULT = [r'(\{.*\})', r'([\w\-\./,^:|\\]+)']
parser.add_argument('-r','--regex',action='append', nargs='?',
help='Regexes to capture values. Defaults to: ' + str(REGEX_DEFAULT),
default=REGEX_DEFAULT)
parser.add_argument('--log-format', type=str, nargs='?', default=r"<content>",
help="""
A simple string to explain how to parse the most basic components of your log line, if they follow a common output format.
Examples:
<time> <content>
<time> <level> <content>
<date> <day> <time> <component> sshd\[<pid>\]: <content>]
Note that a space will be treated as ANY whitespace (\s),
and your string should be otherwise regex-escaped.
""")
parser.add_argument('--depth', type=int, nargs='?', default=100)
parser.add_argument('logpath', type=str)
args = parser.parse_args()
setting = {
# 'log_format': '<time> <level> <content>',
'log_format': args.log_format,
# 'regex': [r'([\w-]+\.){2,}[\w-]+', r'([\d\.]+)', r'("\w+")'],
# 'regexes': [r'(\{.*\})', r'([\w\-\./,^]+)'],
'regexes': args.regex,
# 'similarity_threshold': 0.7,
'similarity_threshold': args.similarity_threshold,
# 'depth': 6,
'depth': args.depth,
}
parser = LogParser(**setting)
df1, df2 = parser.parse(args.logpath)
# print(df1)
# print(df2)
# print(df2.sort_values('Occurrences').tail())
# print(df2[df2.EventTemplate.str.contains('Subscription')])
df1.to_csv(sys.stdout)
# df_result.T.to_csv('Drain_bechmark_result.csv')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment