Skip to content

Instantly share code, notes, and snippets.

@ssstonebraker
Created November 11, 2024 14:24
Show Gist options
  • Save ssstonebraker/7144b14fadf4aa6b846c2be44ca77013 to your computer and use it in GitHub Desktop.
Save ssstonebraker/7144b14fadf4aa6b846c2be44ca77013 to your computer and use it in GitHub Desktop.
Python_Log_parsing_functions
import re
from datetime import datetime
from typing import List, Dict, Optional

# Function to read logs from a file
def read_log_file(file_path: str) -> List[str]:
    """Reads a log file and returns each line as an entry in a list."""
    with open(file_path, 'r') as file:
        return file.readlines()

# Function to filter logs by a keyword
def filter_logs_by_keyword(logs: List[str], keyword: str) -> List[str]:
    """Filters logs that contain a specific keyword."""
    return [log for log in logs if keyword in log]

# Function to extract IP addresses from logs
def extract_ip_addresses(logs: List[str]) -> List[str]:
    """Extracts all unique IP addresses from logs."""
    ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
    ip_addresses = set()
    for log in logs:
        matches = ip_pattern.findall(log)
        ip_addresses.update(matches)
    return list(ip_addresses)

# Function to extract timestamps from logs
def extract_timestamps(logs: List[str], date_format: str) -> List[datetime]:
    """Extracts timestamps from logs according to the provided date format."""
    timestamps = []
    for log in logs:
        try:
            timestamp_str = re.search(r'\b\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\b', log).group()
            timestamp = datetime.strptime(timestamp_str, date_format)
            timestamps.append(timestamp)
        except (AttributeError, ValueError):
            # Skip logs that don't have a proper timestamp
            continue
    return timestamps

# Function to parse log level (INFO, WARNING, ERROR, etc.)
def parse_log_levels(logs: List[str]) -> Dict[str, List[str]]:
    """Parses log levels and categorizes logs by level."""
    log_levels = {"INFO": [], "WARNING": [], "ERROR": []}
    for log in logs:
        if "INFO" in log:
            log_levels["INFO"].append(log)
        elif "WARNING" in log:
            log_levels["WARNING"].append(log)
        elif "ERROR" in log:
            log_levels["ERROR"].append(log)
    return log_levels

# Function to count occurrences of errors
def count_error_occurrences(logs: List[str], error_keyword: str = "ERROR") -> int:
    """Counts the number of occurrences of a specific error keyword in logs."""
    return sum(1 for log in logs if error_keyword in log)

# Function to parse key-value pairs from logs (e.g., IP=192.168.1.1)
def parse_key_value_pairs(log: str, delimiter: str = "=") -> Dict[str, str]:
    """Parses key-value pairs in a single log entry and returns them as a dictionary."""
    pairs = {}
    for part in log.split():
        if delimiter in part:
            key, value = part.split(delimiter, 1)
            pairs[key] = value
    return pairs

# Function to check if logs have anomalies based on error thresholds
def detect_anomalies(logs: List[str], error_threshold: int = 5) -> bool:
    """Detects if anomalies exist based on the error count threshold."""
    error_count = count_error_occurrences(logs)
    return error_count > error_threshold

# Function to parse logs with specific regex patterns
def parse_logs_by_pattern(logs: List[str], pattern: str) -> List[str]:
    """Parses logs and returns entries that match a specific regex pattern."""
    compiled_pattern = re.compile(pattern)
    return [log for log in logs if compiled_pattern.search(log)]

# Function to group logs by date
def group_logs_by_date(logs: List[str], date_format: str) -> Dict[str, List[str]]:
    """Groups logs by date based on timestamps and returns a dictionary."""
    grouped_logs = {}
    for log in logs:
        try:
            timestamp_str = re.search(r'\b\d{4}-\d{2}-\d{2}\b', log).group()
            date = datetime.strptime(timestamp_str, date_format).date().isoformat()
            if date not in grouped_logs:
                grouped_logs[date] = []
            grouped_logs[date].append(log)
        except (AttributeError, ValueError):
            continue
    return grouped_logs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment