Created
October 24, 2025 12:56
-
-
Save anikseu/a922fb4f768339cdf9954c13b9f0da74 to your computer and use it in GitHub Desktop.
Parse Apache Access Log File to Find Key Insights and Traffic Pattern
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| apache_insights.py — Quick, practical insights from Apache/Nginx combined logs. | |
| Features | |
| - Parses (gz or plain) Combined Log Format lines like: | |
| 66.249.72.225 - - [24/Oct/2025:00:01:22 +0000] "GET /path HTTP/1.1" 200 123 "-" "UA" | |
| - Summaries: | |
| * Requests/time (minute & hour), spike detection | |
| * Status code distribution & 3xx/4xx/5xx leaders | |
| * Top IPs (overall, error-heavy, bots vs humans) | |
| * Bot vs human breakdown (keyword heuristic), IPv6 share | |
| * Top URLs overall, top 404 URLs, top 301/302 URLs (possible redirect churn) | |
| * Trailing-slash inconsistencies: pairs that appear with and without '/' | |
| * Query-string heavy URLs & parameters | |
| * Referrers (internal vs external), suspicious/no-referrer share | |
| * User-Agent breakdown & "Googlebot-like" traffic (with optional reverse DNS verification) | |
| - Outputs: pretty console report + optional CSVs (--csv-dir). | |
| - Input: file(s) or STDIN (supports .gz). Streams line-by-line (low memory). | |
| Examples | |
| python apache_insights.py access.log | |
| zcat access.log.gz | python apache_insights.py - | |
| python apache_insights.py access1.log access2.log.gz --csv-dir ./out --top 50 --verify-googlebot | |
| Note | |
| - Reverse DNS verification is best-effort. It can be slow and not 100% authoritative. | |
| - For production bot verification, compare reverse DNS to known domains and forward-confirm. | |
| """ | |
| import argparse | |
| import collections | |
| import csv | |
| import gzip | |
| import io | |
| import ipaddress | |
| import os | |
| import re | |
| import socket | |
| import sys | |
| from datetime import datetime, timezone | |
| from urllib.parse import urlparse, parse_qs | |
| LOG_RE = re.compile( | |
| r'(?P<ip>\S+)\s+' | |
| r'\S+\s+\S+\s+' | |
| r'\[(?P<ts>[^\]]+)\]\s+' | |
| r'"(?P<method>\S+)\s+(?P<url>\S+)\s+(?P<protocol>[^"]+)"\s+' | |
| r'(?P<status>\d{3})\s+(?P<size>\S+)\s+' | |
| r'"(?P<referrer>[^"]*)"\s+"(?P<ua>[^"]*)"' | |
| ) | |
| # Apache time format: 24/Oct/2025:00:01:22 +0000 | |
| def parse_apache_time(ts: str) -> datetime: | |
| try: | |
| return datetime.strptime(ts, "%d/%b/%Y:%H:%M:%S %z") | |
| except Exception: | |
| # fallback without tz (rare), assume UTC | |
| try: | |
| return datetime.strptime(ts.split(" ")[0], "%d/%b/%Y:%H:%M:%S").replace(tzinfo=timezone.utc) | |
| except Exception: | |
| return None | |
| BOT_KEYWORDS = [ | |
| # Major search engines | |
| "googlebot", "bingbot", "yandex", "baiduspider", "duckduckbot", | |
| # SEO crawlers | |
| "ahrefsbot", "semrushbot", "mj12bot", "dotbot", "seznambot", "linkdex", "moz", "screaming frog", "crawler", | |
| # Social | |
| "facebookexternalhit", "slackbot", "twitterbot", "linkedinbot", "whatsapp", "telegrambot", "discordbot", | |
| # Generic | |
| "spider", "bot", "crawl", "curl", "wget", "python-requests", "headless", "httpclient", "cfnetwork" | |
| ] | |
| def is_bot(ua: str) -> bool: | |
| if not ua: | |
| return False | |
| ual = ua.lower() | |
| return any(k in ual for k in BOT_KEYWORDS) | |
| def open_maybe_gz(path: str): | |
| if path == "-": | |
| return sys.stdin | |
| if path.endswith(".gz"): | |
| return io.TextIOWrapper(gzip.open(path, "rb"), encoding="utf-8", errors="replace") | |
| return open(path, "r", encoding="utf-8", errors="replace") | |
| def main(): | |
| ap = argparse.ArgumentParser(description="Generate actionable insights from Apache/Nginx combined logs.") | |
| ap.add_argument("paths", nargs="*", default=["-"], help="Log file(s) to read (.gz ok) or '-' for STDIN.") | |
| ap.add_argument("--top", type=int, default=20, help="How many 'top N' items to display & export.") | |
| ap.add_argument("--csv-dir", type=str, default=None, help="Directory to write CSVs (optional).") | |
| ap.add_argument("--verify-googlebot", action="store_true", | |
| help="Reverse DNS check IPs with Googlebot UA (best-effort, slow).") | |
| ap.add_argument("--only-since", type=str, default=None, | |
| help="Only include entries on/after this timestamp (e.g., '2025-10-24T00:00:00Z').") | |
| ap.add_argument("--only-until", type=str, default=None, | |
| help="Only include entries before this timestamp (e.g., '2025-10-25T00:00:00Z').") | |
| args = ap.parse_args() | |
| since = until = None | |
| if args.only_since: | |
| since = datetime.fromisoformat(args.only_since.replace("Z", "+00:00")) | |
| if args.only_until: | |
| until = datetime.fromisoformat(args.only_until.replace("Z", "+00:00")) | |
| # Counters | |
| total = 0 | |
| status_counter = collections.Counter() | |
| method_counter = collections.Counter() | |
| ip_counter = collections.Counter() | |
| ip_err_counter = collections.Counter() # counts of 4xx/5xx by IP | |
| ua_counter = collections.Counter() | |
| bot_counter = collections.Counter() | |
| human_counter = collections.Counter() | |
| ref_counter = collections.Counter() | |
| ext_ref_counter = collections.Counter() | |
| url_counter = collections.Counter() | |
| url_404_counter = collections.Counter() | |
| url_3xx_counter = collections.Counter() | |
| url_qs_counter = collections.Counter() # with query string | |
| param_counter = collections.Counter() | |
| no_ref = 0 | |
| ipv6 = 0 | |
| # Time buckets | |
| per_minute = collections.Counter() | |
| per_hour = collections.Counter() | |
| # Slash inconsistencies | |
| no_slash = collections.Counter() | |
| with_slash = collections.Counter() | |
| # For optional reverse DNS verification of Googlebot claims | |
| googlebot_ips = set() | |
| for path in args.paths: | |
| try: | |
| f = open_maybe_gz(path) | |
| except Exception as e: | |
| print(f"Failed to open {path}: {e}", file=sys.stderr) | |
| continue | |
| with f: | |
| for line in f: | |
| m = LOG_RE.search(line) | |
| if not m: | |
| continue | |
| d = m.groupdict() | |
| ts = parse_apache_time(d["ts"]) | |
| if ts is None: | |
| continue | |
| if since and ts < since: | |
| continue | |
| if until and ts >= until: | |
| continue | |
| total += 1 | |
| ip = d["ip"] | |
| method = d["method"] | |
| url = d["url"] | |
| protocol = d["protocol"] | |
| status = int(d["status"]) | |
| size = d["size"] | |
| ref = d["referrer"] or "-" | |
| ua = d["ua"] or "" | |
| # time buckets | |
| ts_min = ts.strftime("%Y-%m-%d %H:%M") | |
| ts_hour = ts.strftime("%Y-%m-%d %H:00") | |
| per_minute[ts_min] += 1 | |
| per_hour[ts_hour] += 1 | |
| # counters | |
| status_counter[status] += 1 | |
| method_counter[method] += 1 | |
| ip_counter[ip] += 1 | |
| ua_counter[ua] += 1 | |
| # IPv6 share | |
| try: | |
| if isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address): | |
| ipv6 += 1 | |
| except Exception: | |
| pass | |
| # Errors by IP | |
| if 400 <= status <= 599: | |
| ip_err_counter[ip] += 1 | |
| # bot vs human | |
| if is_bot(ua): | |
| bot_counter[ip] += 1 | |
| else: | |
| human_counter[ip] += 1 | |
| # referrers | |
| if ref == "-" or ref == "": | |
| no_ref += 1 | |
| else: | |
| ref_counter[ref] += 1 | |
| try: | |
| parsed = urlparse(ref) | |
| if parsed.netloc and not parsed.netloc.endswith(("localhost",)) and not parsed.netloc == "": | |
| ext_ref_counter[parsed.netloc] += 1 | |
| except Exception: | |
| pass | |
| # URL analysis | |
| url_counter[url] += 1 | |
| if "?" in url: | |
| url_qs_counter[url] += 1 | |
| try: | |
| qs = urlparse(url).query | |
| for k, vals in parse_qs(qs, keep_blank_values=True).items(): | |
| param_counter[k] += 1 | |
| except Exception: | |
| pass | |
| if status in (301, 302, 307, 308): | |
| url_3xx_counter[url] += 1 | |
| if status == 404: | |
| url_404_counter[url] += 1 | |
| # trailing slash counterparts | |
| # normalize query off | |
| path_only = url.split("?", 1)[0] | |
| if path_only != "/": | |
| if path_only.endswith("/"): | |
| with_slash[path_only[:-1]] += 1 | |
| else: | |
| no_slash[path_only] += 1 | |
| # googlebot claim | |
| if "googlebot" in ua.lower(): | |
| googlebot_ips.add(ip) | |
| if total == 0: | |
| print("No log lines parsed. Ensure your file is in Combined Log Format.", file=sys.stderr) | |
| sys.exit(1) | |
| def pct(x): return f"{(100.0*x/total):.2f}%" | |
| # Spike detection | |
| # Basic: find top minute & hour buckets and compare to median | |
| def top_n(counter, n): | |
| return counter.most_common(n) | |
| def median(lst): | |
| if not lst: | |
| return 0 | |
| s = sorted(lst) | |
| mid = len(s)//2 | |
| if len(s) % 2 == 0: | |
| return (s[mid-1]+s[mid]) / 2 | |
| return s[mid] | |
| minute_counts = list(per_minute.values()) | |
| hour_counts = list(per_hour.values()) | |
| min_med = median(minute_counts) | |
| hr_med = median(hour_counts) | |
| top_minute = top_n(per_minute, 3) | |
| top_hour = top_n(per_hour, 3) | |
| # Trailing slash inconsistency pairs | |
| inconsistent = [] | |
| # pair by base path | |
| all_bases = set(no_slash.keys()) | set(with_slash.keys()) | |
| for base in all_bases: | |
| n_nos = no_slash.get(base, 0) | |
| n_wsl = with_slash.get(base, 0) | |
| if n_nos > 0 and n_wsl > 0: | |
| inconsistent.append((base, n_nos, n_wsl, n_nos + n_wsl)) | |
| inconsistent.sort(key=lambda x: x[3], reverse=True) | |
| # Compose Report | |
| print("="*78) | |
| print("APACHE LOG INSIGHTS REPORT") | |
| print("="*78) | |
| print(f"Total requests: {total}") | |
| print(f"Unique IPs: {len(ip_counter)}") | |
| print(f"Unique User-Agents: {len(ua_counter)}") | |
| print(f"IPv6 share: {ipv6} ({pct(ipv6)})") | |
| print() | |
| # Status | |
| print("Status Codes:") | |
| for code, cnt in sorted(status_counter.items()): | |
| print(f" {code}: {cnt} ({pct(cnt)})") | |
| print() | |
| # Methods | |
| print("HTTP Methods:") | |
| for method, cnt in method_counter.most_common(): | |
| print(f" {method}: {cnt} ({pct(cnt)})") | |
| print() | |
| # Time spikes | |
| print("Traffic Spikes:") | |
| if top_minute: | |
| print(f" Minute median: {min_med:.2f} req/min") | |
| for ts_min, c in top_minute: | |
| ratio = (c / (min_med or 1)) | |
| print(f" * Peak minute {ts_min}: {c} reqs (~{ratio:.1f}x median)") | |
| if top_hour: | |
| print(f" Hour median: {hr_med:.2f} req/hour") | |
| for ts_hr, c in top_hour: | |
| ratio = (c / (hr_med or 1)) | |
| print(f" * Peak hour {ts_hr}: {c} reqs (~{ratio:.1f}x median)") | |
| print() | |
| # Bot vs Human | |
| bot_hits = sum(bot_counter.values()) | |
| human_hits = sum(human_counter.values()) | |
| print("Bot vs Human (heuristic UA keywords):") | |
| print(f" Bots : {bot_hits} ({pct(bot_hits)})") | |
| print(f" Humans: {human_hits} ({pct(human_hits)})") | |
| print(" Top botty IPs:") | |
| for ip, cnt in bot_counter.most_common(10): | |
| print(f" {ip}: {cnt}") | |
| print() | |
| # Top IPs overall & top erroring IPs | |
| print("Top IPs (by hits):") | |
| for ip, cnt in ip_counter.most_common(args.top): | |
| err = ip_err_counter.get(ip, 0) | |
| print(f" {ip}: {cnt} hits, {err} errors") | |
| print() | |
| print("Top IPs by errors (4xx/5xx):") | |
| for ip, cnt in ip_err_counter.most_common(min(args.top, 20)): | |
| total_ip = ip_counter.get(ip, 0) | |
| print(f" {ip}: {cnt} errors out of {total_ip} hits") | |
| print() | |
| # URL analysis | |
| print(f"Top URLs (top {args.top}):") | |
| for url, cnt in url_counter.most_common(args.top): | |
| print(f" {url} — {cnt}") | |
| print() | |
| print(f"Top 404 URLs (top {args.top}):") | |
| for url, cnt in url_404_counter.most_common(args.top): | |
| print(f" {url} — {cnt}") | |
| print() | |
| print(f"Top 3xx URLs (top {args.top}):") | |
| for url, cnt in url_3xx_counter.most_common(args.top): | |
| print(f" {url} — {cnt}") | |
| print() | |
| # Query strings & params | |
| qs_total = sum(url_qs_counter.values()) | |
| print(f"Requests with query strings: {qs_total} ({pct(qs_total)})") | |
| print(f"Top query-string URLs (top {args.top}):") | |
| for url, cnt in url_qs_counter.most_common(args.top): | |
| print(f" {url} — {cnt}") | |
| print("Top parameters:") | |
| for p, cnt in param_counter.most_common(min(args.top, 30)): | |
| print(f" {p} — {cnt}") | |
| print() | |
| # Referrers | |
| print("Referrers:") | |
| print(f" No referrer: {no_ref} ({pct(no_ref)})") | |
| print(f" Top referrers (top {args.top}):") | |
| for r, cnt in ref_counter.most_common(args.top): | |
| print(f" {r} — {cnt}") | |
| if ext_ref_counter: | |
| print(" Top external referrer hosts:") | |
| for host, cnt in ext_ref_counter.most_common(args.top): | |
| print(f" {host} — {cnt}") | |
| print() | |
| # Trailing slash inconsistencies | |
| print("Trailing-slash inconsistencies (seen with and without '/') — top 20 by combined hits:") | |
| for base, n_nos, n_wsl, tot in inconsistent[:20]: | |
| print(f" {base} no-slash:{n_nos} with-slash:{n_wsl} total:{tot}") | |
| print() | |
| # Heuristic red flags | |
| red_flags = [] | |
| total_3xx = sum(c for s, c in status_counter.items() if 300 <= s <= 399) | |
| total_4xx = sum(c for s, c in status_counter.items() if 400 <= s <= 499) | |
| total_5xx = sum(c for s, c in status_counter.items() if 500 <= s <= 599) | |
| if total_3xx / total > 0.15: | |
| red_flags.append(f"High 3xx rate: {total_3xx} ({pct(total_3xx)}). Consider canonical/redirect loops or inconsistent URLs.") | |
| if total_4xx / total > 0.05: | |
| red_flags.append(f"High 4xx rate: {total_4xx} ({pct(total_4xx)}). Many missing pages or blocked assets.") | |
| if total_5xx / total > 0.01: | |
| red_flags.append(f"High 5xx rate: {total_5xx} ({pct(total_5xx)}). Server errors need investigation.") | |
| if inconsistent[:1]: | |
| red_flags.append("Found URLs served both with and without trailing slash. Enforce one canonical format.") | |
| if bot_hits / total > 0.5: | |
| red_flags.append("Traffic dominated by bots. Consider robots.txt tuning, crawl budget controls, or WAF rate limits.") | |
| if no_ref / total > 0.8 and human_hits / total > 0.2: | |
| red_flags.append("Very high no-referrer rate. If this is unexpected, check JS/app-based navigation or stripped headers.") | |
| print("Potential Issues & Next Actions:") | |
| if red_flags: | |
| for rf in red_flags: | |
| print(f" • {rf}") | |
| else: | |
| print(" None detected above thresholds.") | |
| print() | |
| # Optional: verify Googlebot | |
| if args.verify_googlebot and googlebot_ips: | |
| print("Googlebot verification (reverse DNS -> forward confirm):") | |
| for ip in sorted(googlebot_ips): | |
| try: | |
| hostnames = socket.gethostbyaddr(ip)[0:1] | |
| if not hostnames: | |
| print(f" {ip}: no PTR found") | |
| continue | |
| hn = hostnames[0] | |
| # forward-confirm | |
| try: | |
| ais = socket.getaddrinfo(hn, None) | |
| fwd_ips = sorted({a[4][0] for a in ais}) | |
| ok = ip in fwd_ips | |
| except Exception: | |
| fwd_ips = [] | |
| ok = False | |
| print(f" {ip} -> {hn} -> {fwd_ips} {'OK' if ok else 'MISMATCH'}") | |
| except Exception as e: | |
| print(f" {ip}: reverse DNS failed ({e})") | |
| print() | |
| # CSV exports | |
| if args.csv_dir: | |
| os.makedirs(args.csv_dir, exist_ok=True) | |
| def write_counter_csv(name, counter): | |
| path = os.path.join(args.csv_dir, f"{name}.csv") | |
| with open(path, "w", newline="", encoding="utf-8") as fh: | |
| w = csv.writer(fh) | |
| w.writerow(["key", "count"]) | |
| for k, v in counter.most_common(): | |
| w.writerow([k, v]) | |
| write_counter_csv("status_codes", status_counter) | |
| write_counter_csv("methods", method_counter) | |
| write_counter_csv("ips", ip_counter) | |
| write_counter_csv("ip_errors", ip_err_counter) | |
| write_counter_csv("user_agents", ua_counter) | |
| write_counter_csv("bots_by_ip", bot_counter) | |
| write_counter_csv("humans_by_ip", human_counter) | |
| write_counter_csv("referrers", ref_counter) | |
| write_counter_csv("external_referrer_hosts", ext_ref_counter) | |
| write_counter_csv("urls", url_counter) | |
| write_counter_csv("urls_404", url_404_counter) | |
| write_counter_csv("urls_3xx", url_3xx_counter) | |
| write_counter_csv("urls_with_query", url_qs_counter) | |
| write_counter_csv("query_params", param_counter) | |
| write_counter_csv("hits_per_minute", per_minute) | |
| write_counter_csv("hits_per_hour", per_hour) | |
| # write inconsistencies | |
| path = os.path.join(args.csv_dir, "trailing_slash_inconsistencies.csv") | |
| with open(path, "w", newline="", encoding="utf-8") as fh: | |
| w = csv.writer(fh) | |
| w.writerow(["base_path", "no_slash_hits", "with_slash_hits", "total"]) | |
| for base, a, b, t in inconsistent: | |
| w.writerow([base, a, b, t]) | |
| print(f"CSV exports written to: {args.csv_dir}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment