Skip to content

Instantly share code, notes, and snippets.

@jonaslejon
Created October 18, 2024 17:11
Show Gist options
  • Save jonaslejon/df39482b363d5ca997d3bb9a13ac3fbe to your computer and use it in GitHub Desktop.
Save jonaslejon/df39482b363d5ca997d3bb9a13ac3fbe to your computer and use it in GitHub Desktop.
Check Encrypted SNI statistics by reading Zeek logs
import os
import gzip
import orjson
import argparse
from colorama import init, Fore, Style
# Initialize colorama for cross-platform color support
init(autoreset=True)
# Initialize counters for statistics
total_sessions = 0
sessions_with_server_name = 0
sessions_without_server_name = 0
def read_gz_json(file_path):
"""
Reads and decompresses a .gz file and parses each line as JSON using orjson.
"""
try:
with gzip.open(file_path, 'rb') as f:
# Iteratively process each line of the file
for line in f:
# Parse the JSON data line by line
json_data = orjson.loads(line)
yield json_data
except Exception as e:
print(f"{Fore.RED}[ERROR] Failed to read {file_path}: {e}{Style.RESET_ALL}")
return None
def process_logs(directory, verbose=False, output_file=None):
"""
Recursively search for .log.gz files, filter ssl files, decompress, and process the JSON data.
"""
global total_sessions, sessions_with_server_name, sessions_without_server_name
sessions_without_server_name_list = []
for root, _, files in os.walk(directory):
for file in files:
# Only process files with 'ssl' in the filename
if file.endswith(".log.gz") and "ssl" in file:
file_path = os.path.join(root, file)
# Only print file processing info if verbose is enabled
if verbose:
print(f"{Fore.GREEN}[INFO] Processing file: {file_path}{Style.RESET_ALL}")
# Read and process the JSON data from the .log.gz file
for json_data in read_gz_json(file_path):
if json_data is None:
continue
total_sessions += 1
# Check if 'server_name' is present
if 'server_name' in json_data and json_data['server_name']:
sessions_with_server_name += 1
else:
sessions_without_server_name += 1
sessions_without_server_name_list.append(json_data)
# Write sessions without server_name to a file if specified
if output_file:
with open(output_file, 'w') as f_out:
for session in sessions_without_server_name_list:
f_out.write(orjson.dumps(session).decode('utf-8') + '\n')
print(f"{Fore.BLUE}Sessions without 'server_name' written to: {output_file}{Style.RESET_ALL}")
def print_statistics():
"""
Print the final statistics after all logs have been processed.
"""
if total_sessions == 0:
print(f"{Fore.RED}[WARNING] No sessions found.{Style.RESET_ALL}")
return
# Calculate percentages
percent_with_server_name = (sessions_with_server_name / total_sessions) * 100
percent_without_server_name = (sessions_without_server_name / total_sessions) * 100
# Print statistics with colors
print(f"\n{Fore.CYAN}===== Processing Statistics ====={Style.RESET_ALL}")
print(f"{Fore.GREEN}Total sessions processed: {total_sessions}{Style.RESET_ALL}")
print(f"{Fore.GREEN}Sessions with 'server_name': {sessions_with_server_name} ({percent_with_server_name:.2f}%){Style.RESET_ALL}")
print(f"{Fore.YELLOW}Sessions without 'server_name': {sessions_without_server_name} ({percent_without_server_name:.2f}%){Style.RESET_ALL}")
if __name__ == "__main__":
# Set up argument parsing
parser = argparse.ArgumentParser(description="Process Zeek SSL logs for Encrypted SNI statistics.")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output (print files being processed)")
parser.add_argument("-d", "--directory", type=str, default="/usr/local/zeek/logs/", help="Directory to search for SSL logs (default: /usr/local/zeek/logs/)")
parser.add_argument("-o", "--output", type=str, help="File to write sessions without 'server_name'")
args = parser.parse_args()
# Print the starting message
print(f"{Fore.BLUE}Encrypted SNI statistics{Style.RESET_ALL}")
print(f"{Fore.BLUE}Starting SSL log processing...{Style.RESET_ALL}")
# Start processing from the specified directory
process_logs(args.directory, verbose=args.verbose, output_file=args.output)
# Print statistics after processing all files
print_statistics()
print(f"{Fore.BLUE}Processing complete!{Style.RESET_ALL}")
@jonaslejon
Copy link
Author

Exampe run:

# python3 esni-stats.py
Encrypted SNI statistics
Starting SSL log processing...
[ERROR] Failed to read /usr/local/zeek/logs/2024-09-25/ssl.20:58:07-20:58:08.log.gz: unexpected character: line 1 column 1 (char 0)
[ERROR] Failed to read /usr/local/zeek/logs/2024-09-25/ssl.20:56:53-20:58:06.log.gz: unexpected character: line 1 column 1 (char 0)
[ERROR] Failed to read /usr/local/zeek/logs/2024-09-30/ssl.13:00:00-13:09:16.log.gz: unexpected character: line 1 column 1 (char 0)

===== Processing Statistics =====
Total sessions processed: 449512
Sessions with 'server_name': 423648 (94.25%)
Sessions without 'server_name': 25864 (5.75%)
Processing complete!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment