Created
October 12, 2023 03:41
-
-
Save sajalshres/540656cd5cd38226877723f458b65fac to your computer and use it in GitHub Desktop.
Parse logs and extract unique stacktrace
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import os | |
import hashlib | |
import argparse | |
from typing import List | |
def get_argsparser() -> argparse.ArgumentParser: | |
"""Initialize and return the argument parser.""" | |
parser = argparse.ArgumentParser( | |
description="Parse logs and extract unique null pointer exceptions." | |
) | |
parser.add_argument( | |
"-d", | |
"--dir", | |
type=str, | |
dest="dir", | |
required=True, | |
help="Path to the directory containing log files.", | |
) | |
return parser | |
def starts_with_datetime(line: str) -> bool: | |
"""Check if a given line starts with a datetime pattern. | |
Args: | |
line (str): Line of text to check. | |
Returns: | |
bool: True if line starts with datetime pattern, False otherwise. | |
""" | |
pattern = r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" | |
return bool(re.match(pattern, line)) | |
def get_logs(dir: str) -> List[str]: | |
"""Extract logs containing null pointer exceptions from the specified directory. | |
Args: | |
dir (str): Directory path to search for .log files. | |
Returns: | |
List[str]: List of extracted log entries. | |
""" | |
logs = [] | |
for root, _, files in os.walk(dir): | |
for file in files: | |
if file.endswith(".log"): | |
with open(os.path.join(root, file), "r") as log_file: | |
error_log = [] | |
for line in log_file: | |
if "java.lang.NullPointerException" in line: | |
error_log = [line] | |
elif starts_with_datetime(line) and error_log: | |
logs.append("".join(error_log)) | |
error_log = [] | |
elif error_log: | |
error_log.append(line) | |
# Append any remaining logs after the file ends | |
if error_log: | |
logs.append("".join(error_log)) | |
return logs | |
def get_unique_logs(logs: List[str]) -> List[str]: | |
"""Filter out duplicate logs using MD5 hash values. | |
Args: | |
logs (List[str]): List of log entries. | |
Returns: | |
List[str]: List of unique log entries. | |
""" | |
seen_hashes = set() | |
unique = [ | |
log | |
for log in logs | |
if hashlib.md5(log.encode()).hexdigest() not in seen_hashes | |
and not seen_hashes.add(hashlib.md5(log.encode()).hexdigest()) | |
] | |
return unique | |
def main() -> None: | |
"""Main function to parse arguments, extract logs and display unique logs.""" | |
parser = get_argsparser() | |
args = parser.parse_args() | |
logs = get_logs(args.dir) | |
unique = get_unique_logs(logs) | |
for item in unique: | |
print(item) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment