Skip to content

Instantly share code, notes, and snippets.

@DMeurer
Created June 4, 2025 22:16
Show Gist options
  • Save DMeurer/49ce10f1db9eebab018885f3817fb8fe to your computer and use it in GitHub Desktop.
Save DMeurer/49ce10f1db9eebab018885f3817fb8fe to your computer and use it in GitHub Desktop.
Show the top N (defalut is 100) allowed queries in your network. Run with `--help` to get more info. Run this script on your device with the PiHole installed.
#!/usr/bin/env python3
"""
Pi-hole FTL Database Query Exporter
Exports top 100 allowed/permitted queries with all available information to CSV
Creates two output files:
1. Summary CSV: Just the top domains with their query counts (100 rows)
2. Detailed CSV: All individual queries for these top domains (can be large)
Use --summary-only to only create the summary file and skip the detailed export.
"""
import sqlite3
import csv
import sys
import os
from datetime import datetime
import argparse
# Query type mappings from the documentation
QUERY_TYPES = {
1: "A", 2: "AAAA", 3: "ANY", 4: "SRV", 5: "SOA",
6: "PTR", 7: "TXT", 8: "NAPTR", 9: "MX", 10: "DS",
11: "RRSIG", 12: "DNSKEY", 13: "NS", 14: "OTHER",
15: "SVCB", 16: "HTTPS"
}
# Status mappings
STATUS_TYPES = {
0: "Unknown", 1: "Blocked (gravity)", 2: "Allowed (forwarded)",
3: "Allowed (cache)", 4: "Blocked (regex)", 5: "Blocked (exact)",
6: "Blocked (upstream known)", 7: "Blocked (upstream null)",
8: "Blocked (upstream NXDOMAIN)", 9: "Blocked (gravity CNAME)",
10: "Blocked (regex CNAME)", 11: "Blocked (exact CNAME)",
12: "Allowed (retried)", 13: "Allowed (retried ignored)",
14: "Allowed (already forwarded)", 15: "Blocked (busy)",
16: "Blocked (special)", 17: "Allowed (stale cache)",
18: "Blocked (upstream EDE15)"
}
# Reply type mappings
REPLY_TYPES = {
0: "unknown", 1: "NODATA", 2: "NXDOMAIN", 3: "CNAME",
4: "IP", 5: "DOMAIN", 6: "RRNAME", 7: "SERVFAIL",
8: "REFUSED", 9: "NOTIMP", 10: "OTHER", 11: "DNSSEC",
12: "NONE", 13: "BLOB"
}
# DNSSEC status mappings
DNSSEC_STATUS = {
0: "unknown", 1: "SECURE", 2: "INSECURE",
3: "BOGUS", 4: "ABANDONED", 5: "TRUNCATED"
}
# Allowed status codes
ALLOWED_STATUSES = [2, 3, 12, 13, 14, 17]
def get_query_type_name(type_id):
"""Convert query type ID to name, handling TYPE offset"""
if type_id in QUERY_TYPES:
return QUERY_TYPES[type_id]
elif type_id > 100:
return f"TYPE{type_id - 100}"
return f"UNKNOWN({type_id})"
def safe_get_column(row, column_name, default=None):
"""Safely get a column value from a row"""
try:
return row[column_name]
except (KeyError, IndexError):
return default
def main():
parser = argparse.ArgumentParser(description='Export top allowed Pi-hole queries to CSV')
parser.add_argument('-d', '--database', default='/etc/pihole/pihole-FTL.db',
help='Path to Pi-hole FTL database (default: /etc/pihole/pihole-FTL.db)')
parser.add_argument('-o', '--output', default='pihole_top_allowed_queries.csv',
help='Output CSV filename (default: pihole_top_allowed_queries.csv)')
parser.add_argument('-s', '--summary', default='pihole_top_domains_summary.csv',
help='Summary CSV filename (default: pihole_top_domains_summary.csv)')
parser.add_argument('-n', '--number', type=int, default=100,
help='Number of top domains to export (default: 100)')
parser.add_argument('--info', action='store_true',
help='Show database structure information and exit')
parser.add_argument('--summary-only', action='store_true',
help='Only create the summary file, skip detailed queries')
args = parser.parse_args()
# Check if database file exists and is readable
if not os.path.exists(args.database):
print(f"Error: Database file '{args.database}' not found!", file=sys.stderr)
print("\nPossible solutions:", file=sys.stderr)
print("1. Check if the path is correct", file=sys.stderr)
print("2. Common Pi-hole database locations:", file=sys.stderr)
print(" - /etc/pihole/pihole-FTL.db", file=sys.stderr)
print(" - /var/lib/pihole/pihole-FTL.db", file=sys.stderr)
print(" - /opt/pihole/pihole-FTL.db", file=sys.stderr)
print("3. Use 'sudo find / -name pihole-FTL.db 2>/dev/null' to locate it", file=sys.stderr)
# Try to find the database automatically
possible_locations = [
'/etc/pihole/pihole-FTL.db',
'/var/lib/pihole/pihole-FTL.db',
'/opt/pihole/pihole-FTL.db',
'/home/pi/pihole-FTL.db',
'./pihole-FTL.db'
]
for location in possible_locations:
if os.path.exists(location):
print(f"\nFound database at: {location}", file=sys.stderr)
print(f"Try: python {sys.argv[0]} -d {location}", file=sys.stderr)
break
sys.exit(1)
if not os.access(args.database, os.R_OK):
print(f"Error: No read permission for '{args.database}'!", file=sys.stderr)
print("\nTry running with sudo:", file=sys.stderr)
print(f" sudo python {sys.argv[0]} -d {args.database}", file=sys.stderr)
sys.exit(1)
try:
# Connect to database
conn = sqlite3.connect(args.database)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# If --info flag is set, show database structure and exit
if args.info:
print(f"Database structure for: {args.database}\n")
# Get all tables and views
cursor.execute("SELECT name, type FROM sqlite_master WHERE type IN ('table', 'view') ORDER BY type DESC, name")
items = cursor.fetchall()
print("Tables and Views:")
for item in items:
print(f" {item['name']} ({item['type']})")
# Get column info
try:
cursor.execute(f"PRAGMA table_info({item['name']})")
columns = cursor.fetchall()
if columns:
print(" Columns:")
for col in columns:
print(f" - {col['name']} ({col['type']})")
except Exception as e:
print(f" Error getting columns: {e}")
print()
conn.close()
sys.exit(0)
# Check what's available in the database
cursor.execute("SELECT name, type FROM sqlite_master WHERE name IN ('queries', 'query_storage') ORDER BY type DESC")
available = cursor.fetchall()
if not available:
print(f"Error: '{args.database}' doesn't appear to be a Pi-hole FTL database!", file=sys.stderr)
print("Neither 'queries' view nor 'query_storage' table was found.", file=sys.stderr)
conn.close()
sys.exit(1)
# Prefer 'queries' view if available
queries_table = None
for item in available:
if item['name'] == 'queries':
queries_table = 'queries'
print(f"Using 'queries' {item['type']}")
break
if not queries_table:
queries_table = 'query_storage'
print("Using 'query_storage' table")
# Test the table structure
cursor.execute(f"SELECT * FROM {queries_table} LIMIT 1")
sample_row = cursor.fetchone()
if sample_row:
columns = sample_row.keys()
print(f"Found {len(columns)} columns: {', '.join(columns)}")
# Build the WHERE clause for allowed statuses
status_clause = ' OR '.join([f'status = {s}' for s in ALLOWED_STATUSES])
# Get the top N allowed domains by count
print(f"\nFinding top {args.number} allowed domains...")
# First try a simple count query
try:
top_domains_query = f"""
SELECT domain, COUNT(*) as query_count
FROM {queries_table}
WHERE {status_clause}
GROUP BY domain
ORDER BY query_count DESC
LIMIT {args.number}
"""
cursor.execute(top_domains_query)
top_domains = cursor.fetchall()
except sqlite3.OperationalError as e:
print(f"Error with initial query: {e}", file=sys.stderr)
print("Attempting alternative query structure...", file=sys.stderr)
# Try with query_storage and joins
top_domains_query = f"""
SELECT
CASE
WHEN d.domain IS NOT NULL THEN d.domain
ELSE CAST(q.domain AS TEXT)
END as domain,
COUNT(*) as query_count
FROM query_storage q
LEFT JOIN domain_by_id d ON q.domain = d.id
WHERE {status_clause}
GROUP BY domain
ORDER BY query_count DESC
LIMIT {args.number}
"""
cursor.execute(top_domains_query)
top_domains = cursor.fetchall()
if not top_domains:
print("No allowed queries found in the database.")
conn.close()
return
print(f"Found {len(top_domains)} domains. Exporting to CSV files...")
# First, create the summary CSV with just domains and counts
print(f"\nCreating summary file: {args.summary}")
with open(args.summary, 'w', newline='', encoding='utf-8') as summaryfile:
summary_writer = csv.DictWriter(summaryfile, fieldnames=['rank', 'domain', 'query_count'])
summary_writer.writeheader()
for i, domain_row in enumerate(top_domains, 1):
if domain_row['domain']:
summary_writer.writerow({
'rank': i,
'domain': domain_row['domain'],
'query_count': domain_row['query_count']
})
print(f"Summary file created with {len(top_domains)} domains")
# Check if user wants summary only
if args.summary_only:
print(f"\nSuccessfully exported summary to: {args.summary}")
print(f"\nTop 10 allowed domains:")
for i, row in enumerate(top_domains[:10], 1):
if row['domain']:
print(f" {i}. {row['domain']} ({row['query_count']} queries)")
conn.close()
return
# Now create the detailed CSV file with all queries
print(f"\nCreating detailed file: {args.output}")
with open(args.output, 'w', newline='', encoding='utf-8') as csvfile:
# Define fieldnames
fieldnames = [
'domain', 'total_queries', 'timestamp', 'timestamp_readable',
'query_type', 'query_type_id', 'status', 'status_id',
'client', 'forward', 'reply_type', 'reply_type_id',
'reply_time', 'dnssec_status', 'dnssec_status_id',
'additional_info', 'regex_id'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# Process each domain
for domain_row in top_domains:
domain = domain_row['domain']
total_count = domain_row['query_count']
# Skip if domain is None or empty
if not domain:
continue
# Get all queries for this domain
detail_query = f"""
SELECT *
FROM {queries_table}
WHERE domain = ? AND ({status_clause})
ORDER BY timestamp DESC
"""
try:
cursor.execute(detail_query, (domain,))
queries = cursor.fetchall()
for query in queries:
try:
# Safely extract values
timestamp = safe_get_column(query, 'timestamp', 0)
timestamp_readable = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') if timestamp else 'Unknown'
query_type_id = safe_get_column(query, 'type', 0)
query_type_name = get_query_type_name(query_type_id)
status_id = safe_get_column(query, 'status', 0)
status_name = STATUS_TYPES.get(status_id, f"Unknown({status_id})")
reply_type_id = safe_get_column(query, 'reply_type')
reply_type_name = REPLY_TYPES.get(reply_type_id, f"Unknown({reply_type_id})") if reply_type_id is not None else None
dnssec_id = safe_get_column(query, 'dnssec')
dnssec_name = DNSSEC_STATUS.get(dnssec_id, f"Unknown({dnssec_id})") if dnssec_id is not None else None
# Write row
writer.writerow({
'domain': domain,
'total_queries': total_count,
'timestamp': timestamp,
'timestamp_readable': timestamp_readable,
'query_type': query_type_name,
'query_type_id': query_type_id,
'status': status_name,
'status_id': status_id,
'client': safe_get_column(query, 'client'),
'forward': safe_get_column(query, 'forward'),
'reply_type': reply_type_name,
'reply_type_id': reply_type_id,
'reply_time': safe_get_column(query, 'reply_time'),
'dnssec_status': dnssec_name,
'dnssec_status_id': dnssec_id,
'additional_info': safe_get_column(query, 'additional_info'),
'regex_id': safe_get_column(query, 'regex_id')
})
except Exception as e:
print(f"Warning: Error processing query record: {e}", file=sys.stderr)
continue
except Exception as e:
print(f"Warning: Error fetching details for domain '{domain}': {e}", file=sys.stderr)
continue
print(f"Detailed file created with all queries for top {len(top_domains)} domains")
# Get file sizes
summary_size = os.path.getsize(args.summary) / 1024 # KB
detail_size = os.path.getsize(args.output) / 1024 # KB
print(f"\nSuccessfully exported data to:")
print(f" - Summary: {args.summary} ({summary_size:.1f} KB - {len(top_domains)} domains)")
print(f" - Details: {args.output} ({detail_size:.1f} KB - all queries)")
# Print summary statistics
print("\nTop 10 allowed domains:")
for i, row in enumerate(top_domains[:10], 1):
if row['domain']:
print(f" {i}. {row['domain']} ({row['query_count']} queries)")
except sqlite3.Error as e:
print(f"Database error: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
finally:
if 'conn' in locals():
conn.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment