Skip to content

Instantly share code, notes, and snippets.

@anikseu
Created October 24, 2025 12:56
Show Gist options
  • Select an option

  • Save anikseu/a922fb4f768339cdf9954c13b9f0da74 to your computer and use it in GitHub Desktop.

Select an option

Save anikseu/a922fb4f768339cdf9954c13b9f0da74 to your computer and use it in GitHub Desktop.
Parse Apache Access Log File to Find Key Insights and Traffic Pattern
#!/usr/bin/env python3
"""
apache_insights.py — Quick, practical insights from Apache/Nginx combined logs.
Features
- Parses (gz or plain) Combined Log Format lines like:
66.249.72.225 - - [24/Oct/2025:00:01:22 +0000] "GET /path HTTP/1.1" 200 123 "-" "UA"
- Summaries:
* Requests/time (minute & hour), spike detection
* Status code distribution & 3xx/4xx/5xx leaders
* Top IPs (overall, error-heavy, bots vs humans)
* Bot vs human breakdown (keyword heuristic), IPv6 share
* Top URLs overall, top 404 URLs, top 301/302 URLs (possible redirect churn)
* Trailing-slash inconsistencies: pairs that appear with and without '/'
* Query-string heavy URLs & parameters
* Referrers (internal vs external), suspicious/no-referrer share
* User-Agent breakdown & "Googlebot-like" traffic (with optional reverse DNS verification)
- Outputs: pretty console report + optional CSVs (--csv-dir).
- Input: file(s) or STDIN (supports .gz). Streams line-by-line (low memory).
Examples
python apache_insights.py access.log
zcat access.log.gz | python apache_insights.py -
python apache_insights.py access1.log access2.log.gz --csv-dir ./out --top 50 --verify-googlebot
Note
- Reverse DNS verification is best-effort. It can be slow and not 100% authoritative.
- For production bot verification, compare reverse DNS to known domains and forward-confirm.
"""
import argparse
import collections
import csv
import gzip
import io
import ipaddress
import os
import re
import socket
import sys
from datetime import datetime, timezone
from urllib.parse import urlparse, parse_qs
LOG_RE = re.compile(
r'(?P<ip>\S+)\s+'
r'\S+\s+\S+\s+'
r'\[(?P<ts>[^\]]+)\]\s+'
r'"(?P<method>\S+)\s+(?P<url>\S+)\s+(?P<protocol>[^"]+)"\s+'
r'(?P<status>\d{3})\s+(?P<size>\S+)\s+'
r'"(?P<referrer>[^"]*)"\s+"(?P<ua>[^"]*)"'
)
# Apache time format: 24/Oct/2025:00:01:22 +0000
def parse_apache_time(ts: str) -> datetime:
try:
return datetime.strptime(ts, "%d/%b/%Y:%H:%M:%S %z")
except Exception:
# fallback without tz (rare), assume UTC
try:
return datetime.strptime(ts.split(" ")[0], "%d/%b/%Y:%H:%M:%S").replace(tzinfo=timezone.utc)
except Exception:
return None
BOT_KEYWORDS = [
# Major search engines
"googlebot", "bingbot", "yandex", "baiduspider", "duckduckbot",
# SEO crawlers
"ahrefsbot", "semrushbot", "mj12bot", "dotbot", "seznambot", "linkdex", "moz", "screaming frog", "crawler",
# Social
"facebookexternalhit", "slackbot", "twitterbot", "linkedinbot", "whatsapp", "telegrambot", "discordbot",
# Generic
"spider", "bot", "crawl", "curl", "wget", "python-requests", "headless", "httpclient", "cfnetwork"
]
def is_bot(ua: str) -> bool:
if not ua:
return False
ual = ua.lower()
return any(k in ual for k in BOT_KEYWORDS)
def open_maybe_gz(path: str):
if path == "-":
return sys.stdin
if path.endswith(".gz"):
return io.TextIOWrapper(gzip.open(path, "rb"), encoding="utf-8", errors="replace")
return open(path, "r", encoding="utf-8", errors="replace")
def main():
ap = argparse.ArgumentParser(description="Generate actionable insights from Apache/Nginx combined logs.")
ap.add_argument("paths", nargs="*", default=["-"], help="Log file(s) to read (.gz ok) or '-' for STDIN.")
ap.add_argument("--top", type=int, default=20, help="How many 'top N' items to display & export.")
ap.add_argument("--csv-dir", type=str, default=None, help="Directory to write CSVs (optional).")
ap.add_argument("--verify-googlebot", action="store_true",
help="Reverse DNS check IPs with Googlebot UA (best-effort, slow).")
ap.add_argument("--only-since", type=str, default=None,
help="Only include entries on/after this timestamp (e.g., '2025-10-24T00:00:00Z').")
ap.add_argument("--only-until", type=str, default=None,
help="Only include entries before this timestamp (e.g., '2025-10-25T00:00:00Z').")
args = ap.parse_args()
since = until = None
if args.only_since:
since = datetime.fromisoformat(args.only_since.replace("Z", "+00:00"))
if args.only_until:
until = datetime.fromisoformat(args.only_until.replace("Z", "+00:00"))
# Counters
total = 0
status_counter = collections.Counter()
method_counter = collections.Counter()
ip_counter = collections.Counter()
ip_err_counter = collections.Counter() # counts of 4xx/5xx by IP
ua_counter = collections.Counter()
bot_counter = collections.Counter()
human_counter = collections.Counter()
ref_counter = collections.Counter()
ext_ref_counter = collections.Counter()
url_counter = collections.Counter()
url_404_counter = collections.Counter()
url_3xx_counter = collections.Counter()
url_qs_counter = collections.Counter() # with query string
param_counter = collections.Counter()
no_ref = 0
ipv6 = 0
# Time buckets
per_minute = collections.Counter()
per_hour = collections.Counter()
# Slash inconsistencies
no_slash = collections.Counter()
with_slash = collections.Counter()
# For optional reverse DNS verification of Googlebot claims
googlebot_ips = set()
for path in args.paths:
try:
f = open_maybe_gz(path)
except Exception as e:
print(f"Failed to open {path}: {e}", file=sys.stderr)
continue
with f:
for line in f:
m = LOG_RE.search(line)
if not m:
continue
d = m.groupdict()
ts = parse_apache_time(d["ts"])
if ts is None:
continue
if since and ts < since:
continue
if until and ts >= until:
continue
total += 1
ip = d["ip"]
method = d["method"]
url = d["url"]
protocol = d["protocol"]
status = int(d["status"])
size = d["size"]
ref = d["referrer"] or "-"
ua = d["ua"] or ""
# time buckets
ts_min = ts.strftime("%Y-%m-%d %H:%M")
ts_hour = ts.strftime("%Y-%m-%d %H:00")
per_minute[ts_min] += 1
per_hour[ts_hour] += 1
# counters
status_counter[status] += 1
method_counter[method] += 1
ip_counter[ip] += 1
ua_counter[ua] += 1
# IPv6 share
try:
if isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address):
ipv6 += 1
except Exception:
pass
# Errors by IP
if 400 <= status <= 599:
ip_err_counter[ip] += 1
# bot vs human
if is_bot(ua):
bot_counter[ip] += 1
else:
human_counter[ip] += 1
# referrers
if ref == "-" or ref == "":
no_ref += 1
else:
ref_counter[ref] += 1
try:
parsed = urlparse(ref)
if parsed.netloc and not parsed.netloc.endswith(("localhost",)) and not parsed.netloc == "":
ext_ref_counter[parsed.netloc] += 1
except Exception:
pass
# URL analysis
url_counter[url] += 1
if "?" in url:
url_qs_counter[url] += 1
try:
qs = urlparse(url).query
for k, vals in parse_qs(qs, keep_blank_values=True).items():
param_counter[k] += 1
except Exception:
pass
if status in (301, 302, 307, 308):
url_3xx_counter[url] += 1
if status == 404:
url_404_counter[url] += 1
# trailing slash counterparts
# normalize query off
path_only = url.split("?", 1)[0]
if path_only != "/":
if path_only.endswith("/"):
with_slash[path_only[:-1]] += 1
else:
no_slash[path_only] += 1
# googlebot claim
if "googlebot" in ua.lower():
googlebot_ips.add(ip)
if total == 0:
print("No log lines parsed. Ensure your file is in Combined Log Format.", file=sys.stderr)
sys.exit(1)
def pct(x): return f"{(100.0*x/total):.2f}%"
# Spike detection
# Basic: find top minute & hour buckets and compare to median
def top_n(counter, n):
return counter.most_common(n)
def median(lst):
if not lst:
return 0
s = sorted(lst)
mid = len(s)//2
if len(s) % 2 == 0:
return (s[mid-1]+s[mid]) / 2
return s[mid]
minute_counts = list(per_minute.values())
hour_counts = list(per_hour.values())
min_med = median(minute_counts)
hr_med = median(hour_counts)
top_minute = top_n(per_minute, 3)
top_hour = top_n(per_hour, 3)
# Trailing slash inconsistency pairs
inconsistent = []
# pair by base path
all_bases = set(no_slash.keys()) | set(with_slash.keys())
for base in all_bases:
n_nos = no_slash.get(base, 0)
n_wsl = with_slash.get(base, 0)
if n_nos > 0 and n_wsl > 0:
inconsistent.append((base, n_nos, n_wsl, n_nos + n_wsl))
inconsistent.sort(key=lambda x: x[3], reverse=True)
# Compose Report
print("="*78)
print("APACHE LOG INSIGHTS REPORT")
print("="*78)
print(f"Total requests: {total}")
print(f"Unique IPs: {len(ip_counter)}")
print(f"Unique User-Agents: {len(ua_counter)}")
print(f"IPv6 share: {ipv6} ({pct(ipv6)})")
print()
# Status
print("Status Codes:")
for code, cnt in sorted(status_counter.items()):
print(f" {code}: {cnt} ({pct(cnt)})")
print()
# Methods
print("HTTP Methods:")
for method, cnt in method_counter.most_common():
print(f" {method}: {cnt} ({pct(cnt)})")
print()
# Time spikes
print("Traffic Spikes:")
if top_minute:
print(f" Minute median: {min_med:.2f} req/min")
for ts_min, c in top_minute:
ratio = (c / (min_med or 1))
print(f" * Peak minute {ts_min}: {c} reqs (~{ratio:.1f}x median)")
if top_hour:
print(f" Hour median: {hr_med:.2f} req/hour")
for ts_hr, c in top_hour:
ratio = (c / (hr_med or 1))
print(f" * Peak hour {ts_hr}: {c} reqs (~{ratio:.1f}x median)")
print()
# Bot vs Human
bot_hits = sum(bot_counter.values())
human_hits = sum(human_counter.values())
print("Bot vs Human (heuristic UA keywords):")
print(f" Bots : {bot_hits} ({pct(bot_hits)})")
print(f" Humans: {human_hits} ({pct(human_hits)})")
print(" Top botty IPs:")
for ip, cnt in bot_counter.most_common(10):
print(f" {ip}: {cnt}")
print()
# Top IPs overall & top erroring IPs
print("Top IPs (by hits):")
for ip, cnt in ip_counter.most_common(args.top):
err = ip_err_counter.get(ip, 0)
print(f" {ip}: {cnt} hits, {err} errors")
print()
print("Top IPs by errors (4xx/5xx):")
for ip, cnt in ip_err_counter.most_common(min(args.top, 20)):
total_ip = ip_counter.get(ip, 0)
print(f" {ip}: {cnt} errors out of {total_ip} hits")
print()
# URL analysis
print(f"Top URLs (top {args.top}):")
for url, cnt in url_counter.most_common(args.top):
print(f" {url} — {cnt}")
print()
print(f"Top 404 URLs (top {args.top}):")
for url, cnt in url_404_counter.most_common(args.top):
print(f" {url} — {cnt}")
print()
print(f"Top 3xx URLs (top {args.top}):")
for url, cnt in url_3xx_counter.most_common(args.top):
print(f" {url} — {cnt}")
print()
# Query strings & params
qs_total = sum(url_qs_counter.values())
print(f"Requests with query strings: {qs_total} ({pct(qs_total)})")
print(f"Top query-string URLs (top {args.top}):")
for url, cnt in url_qs_counter.most_common(args.top):
print(f" {url} — {cnt}")
print("Top parameters:")
for p, cnt in param_counter.most_common(min(args.top, 30)):
print(f" {p} — {cnt}")
print()
# Referrers
print("Referrers:")
print(f" No referrer: {no_ref} ({pct(no_ref)})")
print(f" Top referrers (top {args.top}):")
for r, cnt in ref_counter.most_common(args.top):
print(f" {r} — {cnt}")
if ext_ref_counter:
print(" Top external referrer hosts:")
for host, cnt in ext_ref_counter.most_common(args.top):
print(f" {host} — {cnt}")
print()
# Trailing slash inconsistencies
print("Trailing-slash inconsistencies (seen with and without '/') — top 20 by combined hits:")
for base, n_nos, n_wsl, tot in inconsistent[:20]:
print(f" {base} no-slash:{n_nos} with-slash:{n_wsl} total:{tot}")
print()
# Heuristic red flags
red_flags = []
total_3xx = sum(c for s, c in status_counter.items() if 300 <= s <= 399)
total_4xx = sum(c for s, c in status_counter.items() if 400 <= s <= 499)
total_5xx = sum(c for s, c in status_counter.items() if 500 <= s <= 599)
if total_3xx / total > 0.15:
red_flags.append(f"High 3xx rate: {total_3xx} ({pct(total_3xx)}). Consider canonical/redirect loops or inconsistent URLs.")
if total_4xx / total > 0.05:
red_flags.append(f"High 4xx rate: {total_4xx} ({pct(total_4xx)}). Many missing pages or blocked assets.")
if total_5xx / total > 0.01:
red_flags.append(f"High 5xx rate: {total_5xx} ({pct(total_5xx)}). Server errors need investigation.")
if inconsistent[:1]:
red_flags.append("Found URLs served both with and without trailing slash. Enforce one canonical format.")
if bot_hits / total > 0.5:
red_flags.append("Traffic dominated by bots. Consider robots.txt tuning, crawl budget controls, or WAF rate limits.")
if no_ref / total > 0.8 and human_hits / total > 0.2:
red_flags.append("Very high no-referrer rate. If this is unexpected, check JS/app-based navigation or stripped headers.")
print("Potential Issues & Next Actions:")
if red_flags:
for rf in red_flags:
print(f" • {rf}")
else:
print(" None detected above thresholds.")
print()
# Optional: verify Googlebot
if args.verify_googlebot and googlebot_ips:
print("Googlebot verification (reverse DNS -> forward confirm):")
for ip in sorted(googlebot_ips):
try:
hostnames = socket.gethostbyaddr(ip)[0:1]
if not hostnames:
print(f" {ip}: no PTR found")
continue
hn = hostnames[0]
# forward-confirm
try:
ais = socket.getaddrinfo(hn, None)
fwd_ips = sorted({a[4][0] for a in ais})
ok = ip in fwd_ips
except Exception:
fwd_ips = []
ok = False
print(f" {ip} -> {hn} -> {fwd_ips} {'OK' if ok else 'MISMATCH'}")
except Exception as e:
print(f" {ip}: reverse DNS failed ({e})")
print()
# CSV exports
if args.csv_dir:
os.makedirs(args.csv_dir, exist_ok=True)
def write_counter_csv(name, counter):
path = os.path.join(args.csv_dir, f"{name}.csv")
with open(path, "w", newline="", encoding="utf-8") as fh:
w = csv.writer(fh)
w.writerow(["key", "count"])
for k, v in counter.most_common():
w.writerow([k, v])
write_counter_csv("status_codes", status_counter)
write_counter_csv("methods", method_counter)
write_counter_csv("ips", ip_counter)
write_counter_csv("ip_errors", ip_err_counter)
write_counter_csv("user_agents", ua_counter)
write_counter_csv("bots_by_ip", bot_counter)
write_counter_csv("humans_by_ip", human_counter)
write_counter_csv("referrers", ref_counter)
write_counter_csv("external_referrer_hosts", ext_ref_counter)
write_counter_csv("urls", url_counter)
write_counter_csv("urls_404", url_404_counter)
write_counter_csv("urls_3xx", url_3xx_counter)
write_counter_csv("urls_with_query", url_qs_counter)
write_counter_csv("query_params", param_counter)
write_counter_csv("hits_per_minute", per_minute)
write_counter_csv("hits_per_hour", per_hour)
# write inconsistencies
path = os.path.join(args.csv_dir, "trailing_slash_inconsistencies.csv")
with open(path, "w", newline="", encoding="utf-8") as fh:
w = csv.writer(fh)
w.writerow(["base_path", "no_slash_hits", "with_slash_hits", "total"])
for base, a, b, t in inconsistent:
w.writerow([base, a, b, t])
print(f"CSV exports written to: {args.csv_dir}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment