Created
November 27, 2024 01:01
-
-
Save highb/75d70a3ae1c554f6595943166e6e8f28 to your computer and use it in GitHub Desktop.
postfixer: postfix log analyzer/sqlite db maker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import multiprocessing | |
import argparse | |
import re | |
import hashlib | |
import sqlite3 | |
import os | |
from collections import defaultdict | |
from tqdm import tqdm | |
from concurrent.futures import ProcessPoolExecutor | |
# Log patterns | |
LOG_PATTERN = re.compile( | |
r"(?P<timestamp>[\d\-T:\.]+[\+\-]\d{2}:\d{2})\s+" # Timestamp | |
r"(?P<host>[^\s]+)\s+" # Hostname | |
r"(?P<process>[^\[]+\[[^\]]+\]):\s+" # Process and PID | |
r"(?P<message_id>[A-F0-9]+):\s+" # Message ID | |
r"(?P<details>.+)" # Log details | |
) | |
LOG_KV_PATTERN = re.compile( | |
r"(?P<key>[a-zA-Z_]+)=(?P<value>[^,]+)" # Key=Value pairs like to=<user@domain> from=<user@domain> | |
) | |
SAID_PATTERN = re.compile( | |
r'said: (\d\d\d) (\d+\.\d+\.\d+)' | |
) | |
OK_PATTERN = re.compile( | |
r"250 \d\.\d\.\d O[Kk]" | |
) | |
# Constants for detail levels | |
DB_FILE = 'postfix_logs.db' | |
class PostfixLogAnalyzer: | |
def __init__(self, args): | |
self.args = args | |
self.logs = [] | |
self.stats = defaultdict(self.default_stat_value) | |
if self.args.use_db: | |
self.setup_database() | |
def default_stat_value(self): | |
return 0 | |
def setup_database(self): | |
if self.args.fresh_db: | |
try: | |
os.remove(DB_FILE) | |
except FileNotFoundError: | |
pass | |
conn = sqlite3.connect(DB_FILE) | |
cursor = conn.cursor() | |
cursor.execute('''CREATE TABLE IF NOT EXISTS logs ( | |
hash TEXT PRIMARY KEY, | |
timestamp TEXT, | |
host TEXT, | |
process TEXT, | |
message_id TEXT, | |
details TEXT, | |
category TEXT)''') | |
# Create messages table for aggregating logs by message_id | |
cursor.execute('''CREATE TABLE IF NOT EXISTS messages ( | |
message_id TEXT PRIMARY KEY, | |
timestamps TEXT, | |
log_hashes TEXT, | |
hosts TEXT, | |
processes TEXT, | |
clients TEXT, | |
senders TEXT, | |
senders_domains TEXT, | |
recipients TEXT, | |
recipients_domains TEXT, | |
statuses TEXT, | |
sizes TEXT, | |
delays TEXT, | |
status_codes TEXT, | |
status_codes_enhanced TEXT, | |
details TEXT, | |
categories TEXT)''') | |
conn.commit() | |
conn.close() | |
def hash_log_line(self, log_line): | |
return hashlib.sha256(log_line.encode('utf-8')).hexdigest() | |
def parse_log_line(self, line): | |
match = LOG_PATTERN.match(line) | |
structured = {} | |
if match: | |
structured = match.groupdict() | |
kv_matches = LOG_KV_PATTERN.findall(structured['details']) | |
for kv_match in kv_matches: | |
key, value = kv_match | |
structured[key] = value | |
return structured | |
else: | |
return None | |
def is_rate_limited_message(self, log_line): | |
rate_limit_strings = ( | |
"temporarily deferred due to unexpected volume or user complaints", | |
"temporarily rate limited due to IP reputation" | |
) | |
return any([string in log_line for string in rate_limit_strings]) | |
def is_bounced_message(self, log_line): | |
return "bounce" in log_line | |
def is_queue_active_message(self, log_line): | |
return "queue active" in log_line | |
def is_lost_connection_message(self, log_line): | |
return "lost connection" in log_line | |
def is_exceeded_max_connections_message(self, log_line): | |
return "exceeded the maximum number of connections" in log_line | |
def is_timed_out_message(self, log_line): | |
return "timed out" in log_line | |
def is_blacklisted_message(self, log_line): | |
return "blacklisted" in log_line | |
def is_ok_message(self, log_line): | |
return re.search(OK_PATTERN, log_line) | |
def is_sent_message(self, log_line): | |
return "status=sent" in log_line | |
def is_expired_message(self, log_line): | |
return "status=expired" in log_line | |
def is_inbox_full_message(self, log_line): | |
return "inbox is out of storage space" in log_line | |
def is_user_rate_limit_message(self, log_line): | |
return "contact is receiving mail at a rate that" in log_line | |
def is_sender_non_delivery_message(self, log_line): | |
return "sender non-delivery" in log_line | |
def is_removed_message(self, log_line): | |
return "removed" in log_line | |
def is_message_id_message(self, log_line): | |
return "message-id" in log_line | |
def is_client_message(self, log_line): | |
return "client" in log_line | |
def categorize_log_entry(self, log_entry): | |
details = log_entry['details'] | |
if self.is_rate_limited_message(details): | |
if "from_host" in log_entry: | |
if 'rate_limit' not in self.stats: | |
self.stats['rate_limit'] = defaultdict(int) | |
self.stats['rate_limit'][log_entry['from_host']] += 1 | |
else: | |
if 'rate_limit' not in self.stats: | |
self.stats['rate_limit'] = defaultdict(int) | |
self.stats['rate_limit']['unknown'] += 1 | |
if self.args.debug: print("Unknown host") | |
if self.args.debug: print(details) | |
self.stats['rate_limit_total'] += 1 | |
return 'rate_limit' | |
elif self.is_queue_active_message(details): | |
self.stats['queue_active'] += 1 | |
return 'queue_active' | |
elif self.is_lost_connection_message(details): | |
self.stats['lost_connection'] += 1 | |
return 'lost_connection' | |
elif self.is_exceeded_max_connections_message(details): | |
self.stats['exceeded_max_connections'] += 1 | |
return 'exceeded_max_connections' | |
elif self.is_timed_out_message(details): | |
self.stats['timed_out'] += 1 | |
return 'timed_out' | |
elif self.is_blacklisted_message(details): | |
self.stats['blacklisted'] += 1 | |
return 'blacklisted' | |
elif self.is_ok_message(details): | |
self.stats['ok'] += 1 | |
return 'ok' | |
elif self.is_sent_message(details): | |
self.stats['sent'] += 1 | |
return 'sent' | |
elif self.is_expired_message(details): | |
self.stats['expired'] += 1 | |
return 'expired' | |
elif self.is_inbox_full_message(details): | |
self.stats['inbox_full'] += 1 | |
return 'inbox_full' | |
elif self.is_user_rate_limit_message(details): | |
self.stats['user_rate_limit'] += 1 | |
return 'user_rate_limit' | |
elif self.is_sender_non_delivery_message(details): | |
self.stats['sender_non_delivery'] += 1 | |
return 'sender_non_delivery' | |
elif self.is_bounced_message(details): | |
self.stats['bounced'] += 1 | |
return 'bounced' | |
elif self.is_removed_message(details): | |
self.stats['removed'] += 1 | |
return 'removed' | |
elif self.is_message_id_message(details): | |
self.stats['message_id'] += 1 | |
return 'message_id' | |
elif self.is_client_message(details): | |
self.stats['client'] += 1 | |
return 'client' | |
else: | |
if self.args.debug: print("Unknown message category") | |
self.stats['unknown'] += 1 | |
if self.args.debug: print(log_entry) | |
if self.args.debug: print() | |
return 'unknown' | |
def store_logs_in_db(self): | |
if self.args.debug: | |
print("Storing logs in database...") | |
conn = sqlite3.connect(DB_FILE) | |
cursor = conn.cursor() | |
message_data = {} | |
# Adding progress bar for storing logs in database | |
for log_entry, log_hash in tqdm(self.logs, desc="Storing log entries in logs table"): | |
category = self.categorize_log_entry(log_entry) | |
cursor.execute('''INSERT OR IGNORE INTO logs (hash, timestamp, host, process, message_id, details, category) | |
VALUES (?, ?, ?, ?, ?, ?, ?)''', | |
(log_hash, log_entry['timestamp'], log_entry['host'], log_entry['process'], log_entry['message_id'], log_entry['details'], category)) | |
# Aggregate log entries by message_id for messages table | |
message_id = log_entry['message_id'] | |
if message_id not in message_data: | |
message_data[message_id] = { | |
'timestamps': set(), | |
'log_hashes': set(), | |
'hosts': set(), | |
'processes': set(), | |
'clients': set(), | |
'senders': set(), | |
'senders_domains': set(), | |
'recipients': set(), | |
'recipients_domains': set(), | |
'statuses': set(), | |
'sizes': set(), | |
'delays': set(), | |
'status_codes': set(), | |
'status_codes_enhanced': set(), | |
'details': set(), | |
'categories': set() | |
} | |
message_data[message_id]['timestamps'].add(log_entry['timestamp']) | |
message_data[message_id]['log_hashes'].add(log_hash) | |
message_data[message_id]['hosts'].add(log_entry['host']) | |
message_data[message_id]['processes'].add(log_entry['process']) | |
message_data[message_id]['details'].add(log_entry['details']) | |
message_data[message_id]['categories'].add(category) | |
# Extract relevant information from details | |
if 'client' in log_entry: | |
message_data[message_id]['clients'].add(log_entry['client']) | |
if 'from' in log_entry: | |
message_data[message_id]['senders'].add(log_entry['from']) | |
_, *domain = log_entry['from'].split('@') | |
if len(domain) == 1: | |
message_data[message_id]['senders_domains'].add(domain[0].strip(">")) | |
if 'to' in log_entry: | |
message_data[message_id]['recipients'].add(log_entry['to']) | |
_, *domain = log_entry['to'].split('@') | |
if len(domain) == 1: | |
message_data[message_id]['recipients_domains'].add(domain[0].strip(">")) | |
if 'status' in log_entry: | |
message_data[message_id]['statuses'].add(log_entry['status']) | |
if 'size' in log_entry: | |
message_data[message_id]['sizes'].add(log_entry['size']) | |
if 'delay' in log_entry: | |
message_data[message_id]['delays'].add(log_entry['delay']) | |
status_code_match = re.search(SAID_PATTERN, log_entry['details']) | |
if 'dsn' in log_entry: | |
message_data[message_id]['status_codes_enhanced'].add(log_entry['dsn']) | |
elif status_code_match: | |
if status_code_match.group(1): | |
message_data[message_id]['status_codes'].add(status_code_match.group(1)) | |
if status_code_match and status_code_match.group(2): | |
message_data[message_id]['status_codes_enhanced'].add(status_code_match.group(2)) | |
# Store aggregated messages data in messages table with progress bar | |
for message_id, data in tqdm(message_data.items(), desc="Storing aggregated messages data in messages table"): | |
cursor.execute('''INSERT OR REPLACE INTO messages (message_id, timestamps, log_hashes, hosts, processes, clients, senders, senders_domains, recipients, recipients_domains, statuses, sizes, delays, status_codes, status_codes_enhanced, details, categories) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', | |
(message_id, | |
', '.join(sorted(data['timestamps'])), | |
', '.join(sorted(data['log_hashes'])), | |
', '.join(sorted(data['hosts'])), | |
', '.join(sorted(data['processes'])), | |
', '.join(sorted(data['clients'])), | |
', '.join(sorted(data['senders'])), | |
', '.join(sorted(data['senders_domains'])), | |
', '.join(sorted(data['recipients'])), | |
', '.join(sorted(data['recipients_domains'])), | |
', '.join(sorted(data['statuses'])), | |
', '.join(sorted(data['sizes'])), | |
', '.join(sorted(data['delays'])), | |
', '.join(sorted(data['status_codes'])), | |
', '.join(sorted(data['status_codes_enhanced'])), | |
'\n'.join(sorted(data['details'])), | |
', '.join(sorted(data['categories'])))) | |
conn.commit() | |
conn.close() | |
def read_log_file(self, filename): | |
logs = [] | |
if self.args.debug: | |
print(f"Reading log file: {filename}") | |
with open(filename, 'r') as file: | |
total_lines = sum(1 for _ in open(filename, 'r')) | |
with tqdm(total=total_lines, desc=f"Processing {filename}") as pbar: | |
for line in file: | |
if self.args.debug: | |
print(f"Processing line: {line.strip()}") | |
log_hash = self.hash_log_line(line) | |
log_entry = self.parse_log_line(line) | |
if log_entry: | |
logs.append((log_entry, log_hash)) | |
pbar.update(1) | |
return logs | |
def read_logs(self, files): | |
with ProcessPoolExecutor(max_workers=self.args.num_cpus) as executor: | |
results = executor.map(self.read_log_file, files) | |
for result in results: | |
self.logs.extend(result) | |
def process_logs(self): | |
for log_entry, _ in self.logs: | |
if self.args.debug: | |
print(f"Processing log entry: {log_entry}") | |
self.categorize_log_entry(log_entry) | |
def generate_report(self): | |
for category, count in self.stats.items(): | |
if isinstance(count, dict): | |
continue # Skip rate_limit host details | |
print(f"Total {category.replace('_', ' ').title()}: {count}") | |
def run(self): | |
if self.args.debug: | |
print("Starting log analysis") | |
self.read_logs(self.args.files) | |
if self.args.use_db: | |
self.store_logs_in_db() | |
self.process_logs() | |
self.generate_report() | |
if self.args.debug: | |
print("Log analysis completed") | |
def main(): | |
parser = argparse.ArgumentParser(description="Pflogsumm - Postfix Log Analyzer in Python.") | |
parser.add_argument('--files', nargs='*', default=[], | |
help='List of log files to process. If empty, reads from stdin.') | |
parser.add_argument('--use-db', action='store_true', | |
help='Store parsed logs in a SQLite database to avoid duplicate parsing.') | |
parser.add_argument('--fresh-db', action='store_true', | |
help='Force creation of a fresh SQLite database.') | |
parser.add_argument('--debug', action='store_true', | |
help='Provide detailed debugging information.') | |
parser.add_argument('--num-cpus', type=int, default=multiprocessing.cpu_count(), | |
help='Number of CPU cores to use for parallel processing. Defaults to max available.') | |
args = parser.parse_args() | |
analyzer = PostfixLogAnalyzer(args) | |
analyzer.run() | |
if __name__ == "__main__": | |
main() |
python3 postfixer.py --files logs/mail.* --use-db --fresh-db
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
only external dep is
tqdm
for pretty progress bars. install withpip install tqdm