Created
May 27, 2020 02:41
-
-
Save studiawan/60910b0a6e6ac926a93044b4ff48e533 to your computer and use it in GitHub Desktop.
Parallel log parser with PyParsing and multiprocessing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import multiprocessing | |
import csv | |
from pyparsing import Word, alphas, Suppress, Combine, string, nums, Optional, Regex | |
class ParallelLogParser(object): | |
def __init__(self, log_file): | |
self.log_file = log_file | |
self.authlog_grammar = self.__get_authlog_grammar() | |
@staticmethod | |
def __get_authlog_grammar(): | |
ints = Word(nums) | |
# timestamp | |
month = Word(string.ascii_uppercase, string.ascii_lowercase, exact=3) | |
day = ints | |
hour = Combine(ints + ":" + ints + ":" + ints) | |
timestamp = month + day + hour | |
# hostname, service name, message | |
hostname_or_ip = Word(alphas + nums + "_" + "-" + ".") | |
appname = Word(alphas + "/" + "-" + "_" + ".") + Optional(Suppress("[") + ints + Suppress("]")) + Suppress(":") | |
message = Regex(".*") | |
# auth log grammar | |
authlog_grammar = timestamp.setResultsName('timestamp') + hostname_or_ip.setResultsName('hostname') + \ | |
appname.setResultsName('application') + message.setResultsName('message') | |
return authlog_grammar | |
def __get_fields(self, log_line): | |
# parsing | |
parsed = self.authlog_grammar.parseString(log_line) | |
# get each field | |
parsed_log = dict() | |
parsed_log['timestamp'] = ' '.join(parsed.timestamp.asList()) | |
parsed_log['hostname'] = parsed.hostname | |
parsed_log['application'] = ' '.join(parsed.application.asList()) | |
parsed_log['message'] = parsed.message | |
return parsed_log | |
def __call__(self, log_line): | |
parsed_log = self.__get_fields(log_line) | |
return parsed_log | |
def __save_csv(self, parsed_logs): | |
# open csv file | |
f = open(self.log_file + '.csv', 'wt') | |
writer = csv.writer(f) | |
writer.writerow(['timestamp', 'hostname', 'application', 'message']) | |
for result in parsed_logs: | |
writer.writerow([result['timestamp'], result['hostname'], result['application'], result['message']]) | |
f.close() | |
def parse_authlog(self): | |
# read log file | |
try: | |
with open(self.log_file, 'r') as f: | |
log_lines = f.readlines() | |
except FileNotFoundError: | |
print('File not found.') | |
sys.exit(1) | |
# run parser with multiprocessing | |
total_cpu = multiprocessing.cpu_count() | |
pool = multiprocessing.Pool(processes=total_cpu) | |
parsed_logs = pool.map(self, log_lines) | |
pool.close() | |
pool.join() | |
self.__save_csv(parsed_logs) | |
if __name__ == '__main__': | |
if len(sys.argv) == 2: | |
file_name = sys.argv[1] | |
parser = ParallelLogParser(file_name) | |
parser.parse_authlog() | |
else: | |
print('Please type a correct log file name.') | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment