Last active
May 20, 2020 11:39
-
-
Save studiawan/33845032e0a15e5d2a100626033f37ef to your computer and use it in GitHub Desktop.
Log file (auth.log) parser with PyParsing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import csv | |
from pyparsing import Word, alphas, Suppress, Combine, string, nums, Optional, Regex | |
class AuthLogParser(object): | |
def __init__(self, log_file): | |
self.log_file = log_file | |
self.authlog_grammar = self.__get_authlog_grammar() | |
@staticmethod | |
def __get_authlog_grammar(): | |
ints = Word(nums) | |
# timestamp | |
month = Word(string.ascii_uppercase, string.ascii_lowercase, exact=3) | |
day = ints | |
hour = Combine(ints + ":" + ints + ":" + ints) | |
timestamp = month + day + hour | |
# hostname, service name, message | |
hostname_or_ip = Word(alphas + nums + "_" + "-" + ".") | |
appname = Word(alphas + "/" + "-" + "_" + ".") + Optional(Suppress("[") + ints + Suppress("]")) + Suppress(":") | |
message = Regex(".*") | |
# auth log grammar | |
authlog_grammar = timestamp.setResultsName('timestamp') + hostname_or_ip.setResultsName('hostname') + \ | |
appname.setResultsName('application') + message.setResultsName('message') | |
return authlog_grammar | |
def __get_fields(self, log_line): | |
# parsing | |
parsed = self.authlog_grammar.parseString(log_line) | |
# get each field | |
parsed_log = dict() | |
parsed_log['timestamp'] = ' '.join(parsed.timestamp.asList()) | |
parsed_log['hostname'] = parsed.hostname | |
parsed_log['application'] = ' '.join(parsed.application.asList()) | |
parsed_log['message'] = parsed.message | |
return parsed_log | |
def parse_authlog(self): | |
try: | |
# open csv file | |
f = open(self.log_file + '.csv', 'wt') | |
writer = csv.writer(f) | |
writer.writerow(['timestamp', 'hostname', 'application', 'message']) | |
with open(self.log_file, 'r') as f: | |
# read each line | |
for line in f: | |
# parse a line | |
result = self.__get_fields(line) | |
# write to csv file and print | |
writer.writerow([result['timestamp'], result['hostname'], result['application'], result['message']]) | |
print(result) | |
# close csv file | |
f.close() | |
except FileNotFoundError: | |
print('File not found.') | |
sys.exit(1) | |
if __name__ == '__main__': | |
if len(sys.argv) == 2: | |
file_name = sys.argv[1] | |
parser = AuthLogParser(file_name) | |
parser.parse_authlog() | |
else: | |
print('Please type a correct log file name.') | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment