Created
May 27, 2020 02:42
-
-
Save studiawan/c9995b5620f57ed9617fea6fd67ef16a to your computer and use it in GitHub Desktop.
Parallel log parser with PyParsing and multiprocessing. Read the log file in chunk.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import multiprocessing | |
import csv | |
from pyparsing import Word, alphas, Suppress, Combine, string, nums, Optional, Regex | |
from itertools import zip_longest | |
class ParallelLogParser(object): | |
def __init__(self, log_file): | |
self.log_file = log_file | |
self.authlog_grammar = self.__get_authlog_grammar() | |
@staticmethod | |
def __get_authlog_grammar(): | |
ints = Word(nums) | |
# timestamp | |
month = Word(string.ascii_uppercase, string.ascii_lowercase, exact=3) | |
day = ints | |
hour = Combine(ints + ":" + ints + ":" + ints) | |
timestamp = month + day + hour | |
# hostname, service name, message | |
hostname_or_ip = Word(alphas + nums + "_" + "-" + ".") | |
appname = Word(alphas + "/" + "-" + "_" + ".") + Optional(Suppress("[") + ints + Suppress("]")) + Suppress(":") | |
message = Regex(".*") | |
# auth log grammar | |
authlog_grammar = timestamp.setResultsName('timestamp') + hostname_or_ip.setResultsName('hostname') + \ | |
appname.setResultsName('application') + message.setResultsName('message') | |
return authlog_grammar | |
def __get_fields(self, log_line): | |
# parsing | |
parsed = self.authlog_grammar.parseString(log_line) | |
# get each field | |
parsed_log = dict() | |
parsed_log['timestamp'] = ' '.join(parsed.timestamp.asList()) | |
parsed_log['hostname'] = parsed.hostname | |
parsed_log['application'] = ' '.join(parsed.application.asList()) | |
parsed_log['message'] = parsed.message | |
return parsed_log | |
def __call__(self, log_line): | |
if log_line is not None: | |
parsed_log = self.__get_fields(log_line) | |
return parsed_log | |
def __grouper(self, n, iterable, padvalue=None): | |
return zip_longest(*[iter(iterable)]*n, fillvalue=padvalue) | |
def parse_authlog_chunk(self): | |
# open log file | |
f_log = open(self.log_file, 'r') | |
chunk_size = 1000 | |
# open csv file | |
f_csv = open(self.log_file + '.csv', 'wt') | |
writer = csv.writer(f_csv) | |
writer.writerow(['timestamp', 'hostname', 'application', 'message']) | |
# create pool for multiprocessing | |
cpu_total = multiprocessing.cpu_count() | |
pool = multiprocessing.Pool(cpu_total) | |
# process in chunk | |
for chunk in self.__grouper(chunk_size, f_log): | |
results = pool.map(self, chunk) | |
# write to csv file | |
for result in results: | |
if result is not None: | |
writer.writerow([result['timestamp'], result['hostname'], result['application'], result['message']]) | |
pool.close() | |
pool.join() | |
f_log.close() | |
f_csv.close() | |
if __name__ == '__main__': | |
if len(sys.argv) == 2: | |
file_name = sys.argv[1] | |
parser = ParallelLogParser(file_name) | |
parser.parse_authlog_chunk() | |
else: | |
print('Please type a correct log file name.') | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment