Created
          June 4, 2025 22:16 
        
      - 
      
 - 
        
Save DMeurer/49ce10f1db9eebab018885f3817fb8fe to your computer and use it in GitHub Desktop.  
    Show the top N (defalut is 100) allowed queries in your network. Run with `--help` to get more info. Run this script on your device with the PiHole installed.
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #!/usr/bin/env python3 | |
| """ | |
| Pi-hole FTL Database Query Exporter | |
| Exports top 100 allowed/permitted queries with all available information to CSV | |
| Creates two output files: | |
| 1. Summary CSV: Just the top domains with their query counts (100 rows) | |
| 2. Detailed CSV: All individual queries for these top domains (can be large) | |
| Use --summary-only to only create the summary file and skip the detailed export. | |
| """ | |
| import sqlite3 | |
| import csv | |
| import sys | |
| import os | |
| from datetime import datetime | |
| import argparse | |
| # Query type mappings from the documentation | |
| QUERY_TYPES = { | |
| 1: "A", 2: "AAAA", 3: "ANY", 4: "SRV", 5: "SOA", | |
| 6: "PTR", 7: "TXT", 8: "NAPTR", 9: "MX", 10: "DS", | |
| 11: "RRSIG", 12: "DNSKEY", 13: "NS", 14: "OTHER", | |
| 15: "SVCB", 16: "HTTPS" | |
| } | |
| # Status mappings | |
| STATUS_TYPES = { | |
| 0: "Unknown", 1: "Blocked (gravity)", 2: "Allowed (forwarded)", | |
| 3: "Allowed (cache)", 4: "Blocked (regex)", 5: "Blocked (exact)", | |
| 6: "Blocked (upstream known)", 7: "Blocked (upstream null)", | |
| 8: "Blocked (upstream NXDOMAIN)", 9: "Blocked (gravity CNAME)", | |
| 10: "Blocked (regex CNAME)", 11: "Blocked (exact CNAME)", | |
| 12: "Allowed (retried)", 13: "Allowed (retried ignored)", | |
| 14: "Allowed (already forwarded)", 15: "Blocked (busy)", | |
| 16: "Blocked (special)", 17: "Allowed (stale cache)", | |
| 18: "Blocked (upstream EDE15)" | |
| } | |
| # Reply type mappings | |
| REPLY_TYPES = { | |
| 0: "unknown", 1: "NODATA", 2: "NXDOMAIN", 3: "CNAME", | |
| 4: "IP", 5: "DOMAIN", 6: "RRNAME", 7: "SERVFAIL", | |
| 8: "REFUSED", 9: "NOTIMP", 10: "OTHER", 11: "DNSSEC", | |
| 12: "NONE", 13: "BLOB" | |
| } | |
| # DNSSEC status mappings | |
| DNSSEC_STATUS = { | |
| 0: "unknown", 1: "SECURE", 2: "INSECURE", | |
| 3: "BOGUS", 4: "ABANDONED", 5: "TRUNCATED" | |
| } | |
| # Allowed status codes | |
| ALLOWED_STATUSES = [2, 3, 12, 13, 14, 17] | |
| def get_query_type_name(type_id): | |
| """Convert query type ID to name, handling TYPE offset""" | |
| if type_id in QUERY_TYPES: | |
| return QUERY_TYPES[type_id] | |
| elif type_id > 100: | |
| return f"TYPE{type_id - 100}" | |
| return f"UNKNOWN({type_id})" | |
| def safe_get_column(row, column_name, default=None): | |
| """Safely get a column value from a row""" | |
| try: | |
| return row[column_name] | |
| except (KeyError, IndexError): | |
| return default | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Export top allowed Pi-hole queries to CSV') | |
| parser.add_argument('-d', '--database', default='/etc/pihole/pihole-FTL.db', | |
| help='Path to Pi-hole FTL database (default: /etc/pihole/pihole-FTL.db)') | |
| parser.add_argument('-o', '--output', default='pihole_top_allowed_queries.csv', | |
| help='Output CSV filename (default: pihole_top_allowed_queries.csv)') | |
| parser.add_argument('-s', '--summary', default='pihole_top_domains_summary.csv', | |
| help='Summary CSV filename (default: pihole_top_domains_summary.csv)') | |
| parser.add_argument('-n', '--number', type=int, default=100, | |
| help='Number of top domains to export (default: 100)') | |
| parser.add_argument('--info', action='store_true', | |
| help='Show database structure information and exit') | |
| parser.add_argument('--summary-only', action='store_true', | |
| help='Only create the summary file, skip detailed queries') | |
| args = parser.parse_args() | |
| # Check if database file exists and is readable | |
| if not os.path.exists(args.database): | |
| print(f"Error: Database file '{args.database}' not found!", file=sys.stderr) | |
| print("\nPossible solutions:", file=sys.stderr) | |
| print("1. Check if the path is correct", file=sys.stderr) | |
| print("2. Common Pi-hole database locations:", file=sys.stderr) | |
| print(" - /etc/pihole/pihole-FTL.db", file=sys.stderr) | |
| print(" - /var/lib/pihole/pihole-FTL.db", file=sys.stderr) | |
| print(" - /opt/pihole/pihole-FTL.db", file=sys.stderr) | |
| print("3. Use 'sudo find / -name pihole-FTL.db 2>/dev/null' to locate it", file=sys.stderr) | |
| # Try to find the database automatically | |
| possible_locations = [ | |
| '/etc/pihole/pihole-FTL.db', | |
| '/var/lib/pihole/pihole-FTL.db', | |
| '/opt/pihole/pihole-FTL.db', | |
| '/home/pi/pihole-FTL.db', | |
| './pihole-FTL.db' | |
| ] | |
| for location in possible_locations: | |
| if os.path.exists(location): | |
| print(f"\nFound database at: {location}", file=sys.stderr) | |
| print(f"Try: python {sys.argv[0]} -d {location}", file=sys.stderr) | |
| break | |
| sys.exit(1) | |
| if not os.access(args.database, os.R_OK): | |
| print(f"Error: No read permission for '{args.database}'!", file=sys.stderr) | |
| print("\nTry running with sudo:", file=sys.stderr) | |
| print(f" sudo python {sys.argv[0]} -d {args.database}", file=sys.stderr) | |
| sys.exit(1) | |
| try: | |
| # Connect to database | |
| conn = sqlite3.connect(args.database) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| # If --info flag is set, show database structure and exit | |
| if args.info: | |
| print(f"Database structure for: {args.database}\n") | |
| # Get all tables and views | |
| cursor.execute("SELECT name, type FROM sqlite_master WHERE type IN ('table', 'view') ORDER BY type DESC, name") | |
| items = cursor.fetchall() | |
| print("Tables and Views:") | |
| for item in items: | |
| print(f" {item['name']} ({item['type']})") | |
| # Get column info | |
| try: | |
| cursor.execute(f"PRAGMA table_info({item['name']})") | |
| columns = cursor.fetchall() | |
| if columns: | |
| print(" Columns:") | |
| for col in columns: | |
| print(f" - {col['name']} ({col['type']})") | |
| except Exception as e: | |
| print(f" Error getting columns: {e}") | |
| print() | |
| conn.close() | |
| sys.exit(0) | |
| # Check what's available in the database | |
| cursor.execute("SELECT name, type FROM sqlite_master WHERE name IN ('queries', 'query_storage') ORDER BY type DESC") | |
| available = cursor.fetchall() | |
| if not available: | |
| print(f"Error: '{args.database}' doesn't appear to be a Pi-hole FTL database!", file=sys.stderr) | |
| print("Neither 'queries' view nor 'query_storage' table was found.", file=sys.stderr) | |
| conn.close() | |
| sys.exit(1) | |
| # Prefer 'queries' view if available | |
| queries_table = None | |
| for item in available: | |
| if item['name'] == 'queries': | |
| queries_table = 'queries' | |
| print(f"Using 'queries' {item['type']}") | |
| break | |
| if not queries_table: | |
| queries_table = 'query_storage' | |
| print("Using 'query_storage' table") | |
| # Test the table structure | |
| cursor.execute(f"SELECT * FROM {queries_table} LIMIT 1") | |
| sample_row = cursor.fetchone() | |
| if sample_row: | |
| columns = sample_row.keys() | |
| print(f"Found {len(columns)} columns: {', '.join(columns)}") | |
| # Build the WHERE clause for allowed statuses | |
| status_clause = ' OR '.join([f'status = {s}' for s in ALLOWED_STATUSES]) | |
| # Get the top N allowed domains by count | |
| print(f"\nFinding top {args.number} allowed domains...") | |
| # First try a simple count query | |
| try: | |
| top_domains_query = f""" | |
| SELECT domain, COUNT(*) as query_count | |
| FROM {queries_table} | |
| WHERE {status_clause} | |
| GROUP BY domain | |
| ORDER BY query_count DESC | |
| LIMIT {args.number} | |
| """ | |
| cursor.execute(top_domains_query) | |
| top_domains = cursor.fetchall() | |
| except sqlite3.OperationalError as e: | |
| print(f"Error with initial query: {e}", file=sys.stderr) | |
| print("Attempting alternative query structure...", file=sys.stderr) | |
| # Try with query_storage and joins | |
| top_domains_query = f""" | |
| SELECT | |
| CASE | |
| WHEN d.domain IS NOT NULL THEN d.domain | |
| ELSE CAST(q.domain AS TEXT) | |
| END as domain, | |
| COUNT(*) as query_count | |
| FROM query_storage q | |
| LEFT JOIN domain_by_id d ON q.domain = d.id | |
| WHERE {status_clause} | |
| GROUP BY domain | |
| ORDER BY query_count DESC | |
| LIMIT {args.number} | |
| """ | |
| cursor.execute(top_domains_query) | |
| top_domains = cursor.fetchall() | |
| if not top_domains: | |
| print("No allowed queries found in the database.") | |
| conn.close() | |
| return | |
| print(f"Found {len(top_domains)} domains. Exporting to CSV files...") | |
| # First, create the summary CSV with just domains and counts | |
| print(f"\nCreating summary file: {args.summary}") | |
| with open(args.summary, 'w', newline='', encoding='utf-8') as summaryfile: | |
| summary_writer = csv.DictWriter(summaryfile, fieldnames=['rank', 'domain', 'query_count']) | |
| summary_writer.writeheader() | |
| for i, domain_row in enumerate(top_domains, 1): | |
| if domain_row['domain']: | |
| summary_writer.writerow({ | |
| 'rank': i, | |
| 'domain': domain_row['domain'], | |
| 'query_count': domain_row['query_count'] | |
| }) | |
| print(f"Summary file created with {len(top_domains)} domains") | |
| # Check if user wants summary only | |
| if args.summary_only: | |
| print(f"\nSuccessfully exported summary to: {args.summary}") | |
| print(f"\nTop 10 allowed domains:") | |
| for i, row in enumerate(top_domains[:10], 1): | |
| if row['domain']: | |
| print(f" {i}. {row['domain']} ({row['query_count']} queries)") | |
| conn.close() | |
| return | |
| # Now create the detailed CSV file with all queries | |
| print(f"\nCreating detailed file: {args.output}") | |
| with open(args.output, 'w', newline='', encoding='utf-8') as csvfile: | |
| # Define fieldnames | |
| fieldnames = [ | |
| 'domain', 'total_queries', 'timestamp', 'timestamp_readable', | |
| 'query_type', 'query_type_id', 'status', 'status_id', | |
| 'client', 'forward', 'reply_type', 'reply_type_id', | |
| 'reply_time', 'dnssec_status', 'dnssec_status_id', | |
| 'additional_info', 'regex_id' | |
| ] | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| # Process each domain | |
| for domain_row in top_domains: | |
| domain = domain_row['domain'] | |
| total_count = domain_row['query_count'] | |
| # Skip if domain is None or empty | |
| if not domain: | |
| continue | |
| # Get all queries for this domain | |
| detail_query = f""" | |
| SELECT * | |
| FROM {queries_table} | |
| WHERE domain = ? AND ({status_clause}) | |
| ORDER BY timestamp DESC | |
| """ | |
| try: | |
| cursor.execute(detail_query, (domain,)) | |
| queries = cursor.fetchall() | |
| for query in queries: | |
| try: | |
| # Safely extract values | |
| timestamp = safe_get_column(query, 'timestamp', 0) | |
| timestamp_readable = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') if timestamp else 'Unknown' | |
| query_type_id = safe_get_column(query, 'type', 0) | |
| query_type_name = get_query_type_name(query_type_id) | |
| status_id = safe_get_column(query, 'status', 0) | |
| status_name = STATUS_TYPES.get(status_id, f"Unknown({status_id})") | |
| reply_type_id = safe_get_column(query, 'reply_type') | |
| reply_type_name = REPLY_TYPES.get(reply_type_id, f"Unknown({reply_type_id})") if reply_type_id is not None else None | |
| dnssec_id = safe_get_column(query, 'dnssec') | |
| dnssec_name = DNSSEC_STATUS.get(dnssec_id, f"Unknown({dnssec_id})") if dnssec_id is not None else None | |
| # Write row | |
| writer.writerow({ | |
| 'domain': domain, | |
| 'total_queries': total_count, | |
| 'timestamp': timestamp, | |
| 'timestamp_readable': timestamp_readable, | |
| 'query_type': query_type_name, | |
| 'query_type_id': query_type_id, | |
| 'status': status_name, | |
| 'status_id': status_id, | |
| 'client': safe_get_column(query, 'client'), | |
| 'forward': safe_get_column(query, 'forward'), | |
| 'reply_type': reply_type_name, | |
| 'reply_type_id': reply_type_id, | |
| 'reply_time': safe_get_column(query, 'reply_time'), | |
| 'dnssec_status': dnssec_name, | |
| 'dnssec_status_id': dnssec_id, | |
| 'additional_info': safe_get_column(query, 'additional_info'), | |
| 'regex_id': safe_get_column(query, 'regex_id') | |
| }) | |
| except Exception as e: | |
| print(f"Warning: Error processing query record: {e}", file=sys.stderr) | |
| continue | |
| except Exception as e: | |
| print(f"Warning: Error fetching details for domain '{domain}': {e}", file=sys.stderr) | |
| continue | |
| print(f"Detailed file created with all queries for top {len(top_domains)} domains") | |
| # Get file sizes | |
| summary_size = os.path.getsize(args.summary) / 1024 # KB | |
| detail_size = os.path.getsize(args.output) / 1024 # KB | |
| print(f"\nSuccessfully exported data to:") | |
| print(f" - Summary: {args.summary} ({summary_size:.1f} KB - {len(top_domains)} domains)") | |
| print(f" - Details: {args.output} ({detail_size:.1f} KB - all queries)") | |
| # Print summary statistics | |
| print("\nTop 10 allowed domains:") | |
| for i, row in enumerate(top_domains[:10], 1): | |
| if row['domain']: | |
| print(f" {i}. {row['domain']} ({row['query_count']} queries)") | |
| except sqlite3.Error as e: | |
| print(f"Database error: {e}", file=sys.stderr) | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"Error: {e}", file=sys.stderr) | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| finally: | |
| if 'conn' in locals(): | |
| conn.close() | |
| if __name__ == "__main__": | |
| main() | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment