import re
from datetime import datetime
from typing import List, Dict, Optional
# Function to read logs from a file
def read_log_file(file_path: str) -> List[str]:
"""Reads a log file and returns each line as an entry in a list."""
with open(file_path, 'r') as file:
return file.readlines()
# Function to filter logs by a keyword
def filter_logs_by_keyword(logs: List[str], keyword: str) -> List[str]:
"""Filters logs that contain a specific keyword."""
return [log for log in logs if keyword in log]
# Function to extract IP addresses from logs
def extract_ip_addresses(logs: List[str]) -> List[str]:
"""Extracts all unique IP addresses from logs."""
ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
ip_addresses = set()
for log in logs:
matches = ip_pattern.findall(log)
ip_addresses.update(matches)
return list(ip_addresses)
# Function to extract timestamps from logs
def extract_timestamps(logs: List[str], date_format: str) -> List[datetime]:
"""Extracts timestamps from logs according to the provided date format."""
timestamps = []
for log in logs:
try:
timestamp_str = re.search(r'\b\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\b', log).group()
timestamp = datetime.strptime(timestamp_str, date_format)
timestamps.append(timestamp)
except (AttributeError, ValueError):
# Skip logs that don't have a proper timestamp
continue
return timestamps
# Function to parse log level (INFO, WARNING, ERROR, etc.)
def parse_log_levels(logs: List[str]) -> Dict[str, List[str]]:
"""Parses log levels and categorizes logs by level."""
log_levels = {"INFO": [], "WARNING": [], "ERROR": []}
for log in logs:
if "INFO" in log:
log_levels["INFO"].append(log)
elif "WARNING" in log:
log_levels["WARNING"].append(log)
elif "ERROR" in log:
log_levels["ERROR"].append(log)
return log_levels
# Function to count occurrences of errors
def count_error_occurrences(logs: List[str], error_keyword: str = "ERROR") -> int:
"""Counts the number of occurrences of a specific error keyword in logs."""
return sum(1 for log in logs if error_keyword in log)
# Function to parse key-value pairs from logs (e.g., IP=192.168.1.1)
def parse_key_value_pairs(log: str, delimiter: str = "=") -> Dict[str, str]:
"""Parses key-value pairs in a single log entry and returns them as a dictionary."""
pairs = {}
for part in log.split():
if delimiter in part:
key, value = part.split(delimiter, 1)
pairs[key] = value
return pairs
# Function to check if logs have anomalies based on error thresholds
def detect_anomalies(logs: List[str], error_threshold: int = 5) -> bool:
"""Detects if anomalies exist based on the error count threshold."""
error_count = count_error_occurrences(logs)
return error_count > error_threshold
# Function to parse logs with specific regex patterns
def parse_logs_by_pattern(logs: List[str], pattern: str) -> List[str]:
"""Parses logs and returns entries that match a specific regex pattern."""
compiled_pattern = re.compile(pattern)
return [log for log in logs if compiled_pattern.search(log)]
# Function to group logs by date
def group_logs_by_date(logs: List[str], date_format: str) -> Dict[str, List[str]]:
"""Groups logs by date based on timestamps and returns a dictionary."""
grouped_logs = {}
for log in logs:
try:
timestamp_str = re.search(r'\b\d{4}-\d{2}-\d{2}\b', log).group()
date = datetime.strptime(timestamp_str, date_format).date().isoformat()
if date not in grouped_logs:
grouped_logs[date] = []
grouped_logs[date].append(log)
except (AttributeError, ValueError):
continue
return grouped_logs
Created
November 11, 2024 14:24
-
-
Save ssstonebraker/7144b14fadf4aa6b846c2be44ca77013 to your computer and use it in GitHub Desktop.
Python_Log_parsing_functions
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment