Skip to content

Instantly share code, notes, and snippets.

@highb
Created November 27, 2024 01:01
Show Gist options
  • Save highb/75d70a3ae1c554f6595943166e6e8f28 to your computer and use it in GitHub Desktop.
Save highb/75d70a3ae1c554f6595943166e6e8f28 to your computer and use it in GitHub Desktop.
postfixer: postfix log analyzer/sqlite db maker
import argparse
import multiprocessing
import argparse
import re
import hashlib
import sqlite3
import os
from collections import defaultdict
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor
# Log patterns
LOG_PATTERN = re.compile(
r"(?P<timestamp>[\d\-T:\.]+[\+\-]\d{2}:\d{2})\s+" # Timestamp
r"(?P<host>[^\s]+)\s+" # Hostname
r"(?P<process>[^\[]+\[[^\]]+\]):\s+" # Process and PID
r"(?P<message_id>[A-F0-9]+):\s+" # Message ID
r"(?P<details>.+)" # Log details
)
LOG_KV_PATTERN = re.compile(
r"(?P<key>[a-zA-Z_]+)=(?P<value>[^,]+)" # Key=Value pairs like to=<user@domain> from=<user@domain>
)
SAID_PATTERN = re.compile(
r'said: (\d\d\d) (\d+\.\d+\.\d+)'
)
OK_PATTERN = re.compile(
r"250 \d\.\d\.\d O[Kk]"
)
# Constants for detail levels
DB_FILE = 'postfix_logs.db'
class PostfixLogAnalyzer:
def __init__(self, args):
self.args = args
self.logs = []
self.stats = defaultdict(self.default_stat_value)
if self.args.use_db:
self.setup_database()
def default_stat_value(self):
return 0
def setup_database(self):
if self.args.fresh_db:
try:
os.remove(DB_FILE)
except FileNotFoundError:
pass
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS logs (
hash TEXT PRIMARY KEY,
timestamp TEXT,
host TEXT,
process TEXT,
message_id TEXT,
details TEXT,
category TEXT)''')
# Create messages table for aggregating logs by message_id
cursor.execute('''CREATE TABLE IF NOT EXISTS messages (
message_id TEXT PRIMARY KEY,
timestamps TEXT,
log_hashes TEXT,
hosts TEXT,
processes TEXT,
clients TEXT,
senders TEXT,
senders_domains TEXT,
recipients TEXT,
recipients_domains TEXT,
statuses TEXT,
sizes TEXT,
delays TEXT,
status_codes TEXT,
status_codes_enhanced TEXT,
details TEXT,
categories TEXT)''')
conn.commit()
conn.close()
def hash_log_line(self, log_line):
return hashlib.sha256(log_line.encode('utf-8')).hexdigest()
def parse_log_line(self, line):
match = LOG_PATTERN.match(line)
structured = {}
if match:
structured = match.groupdict()
kv_matches = LOG_KV_PATTERN.findall(structured['details'])
for kv_match in kv_matches:
key, value = kv_match
structured[key] = value
return structured
else:
return None
def is_rate_limited_message(self, log_line):
rate_limit_strings = (
"temporarily deferred due to unexpected volume or user complaints",
"temporarily rate limited due to IP reputation"
)
return any([string in log_line for string in rate_limit_strings])
def is_bounced_message(self, log_line):
return "bounce" in log_line
def is_queue_active_message(self, log_line):
return "queue active" in log_line
def is_lost_connection_message(self, log_line):
return "lost connection" in log_line
def is_exceeded_max_connections_message(self, log_line):
return "exceeded the maximum number of connections" in log_line
def is_timed_out_message(self, log_line):
return "timed out" in log_line
def is_blacklisted_message(self, log_line):
return "blacklisted" in log_line
def is_ok_message(self, log_line):
return re.search(OK_PATTERN, log_line)
def is_sent_message(self, log_line):
return "status=sent" in log_line
def is_expired_message(self, log_line):
return "status=expired" in log_line
def is_inbox_full_message(self, log_line):
return "inbox is out of storage space" in log_line
def is_user_rate_limit_message(self, log_line):
return "contact is receiving mail at a rate that" in log_line
def is_sender_non_delivery_message(self, log_line):
return "sender non-delivery" in log_line
def is_removed_message(self, log_line):
return "removed" in log_line
def is_message_id_message(self, log_line):
return "message-id" in log_line
def is_client_message(self, log_line):
return "client" in log_line
def categorize_log_entry(self, log_entry):
details = log_entry['details']
if self.is_rate_limited_message(details):
if "from_host" in log_entry:
if 'rate_limit' not in self.stats:
self.stats['rate_limit'] = defaultdict(int)
self.stats['rate_limit'][log_entry['from_host']] += 1
else:
if 'rate_limit' not in self.stats:
self.stats['rate_limit'] = defaultdict(int)
self.stats['rate_limit']['unknown'] += 1
if self.args.debug: print("Unknown host")
if self.args.debug: print(details)
self.stats['rate_limit_total'] += 1
return 'rate_limit'
elif self.is_queue_active_message(details):
self.stats['queue_active'] += 1
return 'queue_active'
elif self.is_lost_connection_message(details):
self.stats['lost_connection'] += 1
return 'lost_connection'
elif self.is_exceeded_max_connections_message(details):
self.stats['exceeded_max_connections'] += 1
return 'exceeded_max_connections'
elif self.is_timed_out_message(details):
self.stats['timed_out'] += 1
return 'timed_out'
elif self.is_blacklisted_message(details):
self.stats['blacklisted'] += 1
return 'blacklisted'
elif self.is_ok_message(details):
self.stats['ok'] += 1
return 'ok'
elif self.is_sent_message(details):
self.stats['sent'] += 1
return 'sent'
elif self.is_expired_message(details):
self.stats['expired'] += 1
return 'expired'
elif self.is_inbox_full_message(details):
self.stats['inbox_full'] += 1
return 'inbox_full'
elif self.is_user_rate_limit_message(details):
self.stats['user_rate_limit'] += 1
return 'user_rate_limit'
elif self.is_sender_non_delivery_message(details):
self.stats['sender_non_delivery'] += 1
return 'sender_non_delivery'
elif self.is_bounced_message(details):
self.stats['bounced'] += 1
return 'bounced'
elif self.is_removed_message(details):
self.stats['removed'] += 1
return 'removed'
elif self.is_message_id_message(details):
self.stats['message_id'] += 1
return 'message_id'
elif self.is_client_message(details):
self.stats['client'] += 1
return 'client'
else:
if self.args.debug: print("Unknown message category")
self.stats['unknown'] += 1
if self.args.debug: print(log_entry)
if self.args.debug: print()
return 'unknown'
def store_logs_in_db(self):
if self.args.debug:
print("Storing logs in database...")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
message_data = {}
# Adding progress bar for storing logs in database
for log_entry, log_hash in tqdm(self.logs, desc="Storing log entries in logs table"):
category = self.categorize_log_entry(log_entry)
cursor.execute('''INSERT OR IGNORE INTO logs (hash, timestamp, host, process, message_id, details, category)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(log_hash, log_entry['timestamp'], log_entry['host'], log_entry['process'], log_entry['message_id'], log_entry['details'], category))
# Aggregate log entries by message_id for messages table
message_id = log_entry['message_id']
if message_id not in message_data:
message_data[message_id] = {
'timestamps': set(),
'log_hashes': set(),
'hosts': set(),
'processes': set(),
'clients': set(),
'senders': set(),
'senders_domains': set(),
'recipients': set(),
'recipients_domains': set(),
'statuses': set(),
'sizes': set(),
'delays': set(),
'status_codes': set(),
'status_codes_enhanced': set(),
'details': set(),
'categories': set()
}
message_data[message_id]['timestamps'].add(log_entry['timestamp'])
message_data[message_id]['log_hashes'].add(log_hash)
message_data[message_id]['hosts'].add(log_entry['host'])
message_data[message_id]['processes'].add(log_entry['process'])
message_data[message_id]['details'].add(log_entry['details'])
message_data[message_id]['categories'].add(category)
# Extract relevant information from details
if 'client' in log_entry:
message_data[message_id]['clients'].add(log_entry['client'])
if 'from' in log_entry:
message_data[message_id]['senders'].add(log_entry['from'])
_, *domain = log_entry['from'].split('@')
if len(domain) == 1:
message_data[message_id]['senders_domains'].add(domain[0].strip(">"))
if 'to' in log_entry:
message_data[message_id]['recipients'].add(log_entry['to'])
_, *domain = log_entry['to'].split('@')
if len(domain) == 1:
message_data[message_id]['recipients_domains'].add(domain[0].strip(">"))
if 'status' in log_entry:
message_data[message_id]['statuses'].add(log_entry['status'])
if 'size' in log_entry:
message_data[message_id]['sizes'].add(log_entry['size'])
if 'delay' in log_entry:
message_data[message_id]['delays'].add(log_entry['delay'])
status_code_match = re.search(SAID_PATTERN, log_entry['details'])
if 'dsn' in log_entry:
message_data[message_id]['status_codes_enhanced'].add(log_entry['dsn'])
elif status_code_match:
if status_code_match.group(1):
message_data[message_id]['status_codes'].add(status_code_match.group(1))
if status_code_match and status_code_match.group(2):
message_data[message_id]['status_codes_enhanced'].add(status_code_match.group(2))
# Store aggregated messages data in messages table with progress bar
for message_id, data in tqdm(message_data.items(), desc="Storing aggregated messages data in messages table"):
cursor.execute('''INSERT OR REPLACE INTO messages (message_id, timestamps, log_hashes, hosts, processes, clients, senders, senders_domains, recipients, recipients_domains, statuses, sizes, delays, status_codes, status_codes_enhanced, details, categories)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(message_id,
', '.join(sorted(data['timestamps'])),
', '.join(sorted(data['log_hashes'])),
', '.join(sorted(data['hosts'])),
', '.join(sorted(data['processes'])),
', '.join(sorted(data['clients'])),
', '.join(sorted(data['senders'])),
', '.join(sorted(data['senders_domains'])),
', '.join(sorted(data['recipients'])),
', '.join(sorted(data['recipients_domains'])),
', '.join(sorted(data['statuses'])),
', '.join(sorted(data['sizes'])),
', '.join(sorted(data['delays'])),
', '.join(sorted(data['status_codes'])),
', '.join(sorted(data['status_codes_enhanced'])),
'\n'.join(sorted(data['details'])),
', '.join(sorted(data['categories']))))
conn.commit()
conn.close()
def read_log_file(self, filename):
logs = []
if self.args.debug:
print(f"Reading log file: {filename}")
with open(filename, 'r') as file:
total_lines = sum(1 for _ in open(filename, 'r'))
with tqdm(total=total_lines, desc=f"Processing {filename}") as pbar:
for line in file:
if self.args.debug:
print(f"Processing line: {line.strip()}")
log_hash = self.hash_log_line(line)
log_entry = self.parse_log_line(line)
if log_entry:
logs.append((log_entry, log_hash))
pbar.update(1)
return logs
def read_logs(self, files):
with ProcessPoolExecutor(max_workers=self.args.num_cpus) as executor:
results = executor.map(self.read_log_file, files)
for result in results:
self.logs.extend(result)
def process_logs(self):
for log_entry, _ in self.logs:
if self.args.debug:
print(f"Processing log entry: {log_entry}")
self.categorize_log_entry(log_entry)
def generate_report(self):
for category, count in self.stats.items():
if isinstance(count, dict):
continue # Skip rate_limit host details
print(f"Total {category.replace('_', ' ').title()}: {count}")
def run(self):
if self.args.debug:
print("Starting log analysis")
self.read_logs(self.args.files)
if self.args.use_db:
self.store_logs_in_db()
self.process_logs()
self.generate_report()
if self.args.debug:
print("Log analysis completed")
def main():
parser = argparse.ArgumentParser(description="Pflogsumm - Postfix Log Analyzer in Python.")
parser.add_argument('--files', nargs='*', default=[],
help='List of log files to process. If empty, reads from stdin.')
parser.add_argument('--use-db', action='store_true',
help='Store parsed logs in a SQLite database to avoid duplicate parsing.')
parser.add_argument('--fresh-db', action='store_true',
help='Force creation of a fresh SQLite database.')
parser.add_argument('--debug', action='store_true',
help='Provide detailed debugging information.')
parser.add_argument('--num-cpus', type=int, default=multiprocessing.cpu_count(),
help='Number of CPU cores to use for parallel processing. Defaults to max available.')
args = parser.parse_args()
analyzer = PostfixLogAnalyzer(args)
analyzer.run()
if __name__ == "__main__":
main()
@highb
Copy link
Author

highb commented Nov 27, 2024

only external dep is tqdm for pretty progress bars. install with pip install tqdm

@highb
Copy link
Author

highb commented Nov 27, 2024

python3 postfixer.py --files logs/mail.* --use-db --fresh-db

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment