Created
October 18, 2024 17:11
-
-
Save jonaslejon/df39482b363d5ca997d3bb9a13ac3fbe to your computer and use it in GitHub Desktop.
Check Encrypted SNI statistics by reading Zeek logs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import gzip | |
import orjson | |
import argparse | |
from colorama import init, Fore, Style | |
# Initialize colorama for cross-platform color support | |
init(autoreset=True) | |
# Initialize counters for statistics | |
total_sessions = 0 | |
sessions_with_server_name = 0 | |
sessions_without_server_name = 0 | |
def read_gz_json(file_path): | |
""" | |
Reads and decompresses a .gz file and parses each line as JSON using orjson. | |
""" | |
try: | |
with gzip.open(file_path, 'rb') as f: | |
# Iteratively process each line of the file | |
for line in f: | |
# Parse the JSON data line by line | |
json_data = orjson.loads(line) | |
yield json_data | |
except Exception as e: | |
print(f"{Fore.RED}[ERROR] Failed to read {file_path}: {e}{Style.RESET_ALL}") | |
return None | |
def process_logs(directory, verbose=False, output_file=None): | |
""" | |
Recursively search for .log.gz files, filter ssl files, decompress, and process the JSON data. | |
""" | |
global total_sessions, sessions_with_server_name, sessions_without_server_name | |
sessions_without_server_name_list = [] | |
for root, _, files in os.walk(directory): | |
for file in files: | |
# Only process files with 'ssl' in the filename | |
if file.endswith(".log.gz") and "ssl" in file: | |
file_path = os.path.join(root, file) | |
# Only print file processing info if verbose is enabled | |
if verbose: | |
print(f"{Fore.GREEN}[INFO] Processing file: {file_path}{Style.RESET_ALL}") | |
# Read and process the JSON data from the .log.gz file | |
for json_data in read_gz_json(file_path): | |
if json_data is None: | |
continue | |
total_sessions += 1 | |
# Check if 'server_name' is present | |
if 'server_name' in json_data and json_data['server_name']: | |
sessions_with_server_name += 1 | |
else: | |
sessions_without_server_name += 1 | |
sessions_without_server_name_list.append(json_data) | |
# Write sessions without server_name to a file if specified | |
if output_file: | |
with open(output_file, 'w') as f_out: | |
for session in sessions_without_server_name_list: | |
f_out.write(orjson.dumps(session).decode('utf-8') + '\n') | |
print(f"{Fore.BLUE}Sessions without 'server_name' written to: {output_file}{Style.RESET_ALL}") | |
def print_statistics(): | |
""" | |
Print the final statistics after all logs have been processed. | |
""" | |
if total_sessions == 0: | |
print(f"{Fore.RED}[WARNING] No sessions found.{Style.RESET_ALL}") | |
return | |
# Calculate percentages | |
percent_with_server_name = (sessions_with_server_name / total_sessions) * 100 | |
percent_without_server_name = (sessions_without_server_name / total_sessions) * 100 | |
# Print statistics with colors | |
print(f"\n{Fore.CYAN}===== Processing Statistics ====={Style.RESET_ALL}") | |
print(f"{Fore.GREEN}Total sessions processed: {total_sessions}{Style.RESET_ALL}") | |
print(f"{Fore.GREEN}Sessions with 'server_name': {sessions_with_server_name} ({percent_with_server_name:.2f}%){Style.RESET_ALL}") | |
print(f"{Fore.YELLOW}Sessions without 'server_name': {sessions_without_server_name} ({percent_without_server_name:.2f}%){Style.RESET_ALL}") | |
if __name__ == "__main__": | |
# Set up argument parsing | |
parser = argparse.ArgumentParser(description="Process Zeek SSL logs for Encrypted SNI statistics.") | |
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output (print files being processed)") | |
parser.add_argument("-d", "--directory", type=str, default="/usr/local/zeek/logs/", help="Directory to search for SSL logs (default: /usr/local/zeek/logs/)") | |
parser.add_argument("-o", "--output", type=str, help="File to write sessions without 'server_name'") | |
args = parser.parse_args() | |
# Print the starting message | |
print(f"{Fore.BLUE}Encrypted SNI statistics{Style.RESET_ALL}") | |
print(f"{Fore.BLUE}Starting SSL log processing...{Style.RESET_ALL}") | |
# Start processing from the specified directory | |
process_logs(args.directory, verbose=args.verbose, output_file=args.output) | |
# Print statistics after processing all files | |
print_statistics() | |
print(f"{Fore.BLUE}Processing complete!{Style.RESET_ALL}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Exampe run: