Skip to content

Instantly share code, notes, and snippets.

@EricsonWillians
Created March 3, 2025 22:27
Show Gist options
  • Save EricsonWillians/040ce6cb2b86c433a2b7e7e356ee5519 to your computer and use it in GitHub Desktop.
Save EricsonWillians/040ce6cb2b86c433a2b7e7e356ee5519 to your computer and use it in GitHub Desktop.
Enterprise-grade encoding analysis tool with comprehensive JSON handling. Supports recursive decoding, binary data inspection, and advanced format detection.
#!/usr/bin/env python3
"""
Advanced Encoding Analyzer Pro
------------------------------
Enterprise-grade encoding analysis tool with comprehensive JSON handling.
Supports recursive decoding, binary data inspection, and advanced format detection.
"""
import argparse
import base64
import binascii
import bz2
import chardet
import datetime
import gzip
import hashlib
import itertools
import io
import json
import lzma
import os
import re
import struct
import sys
import traceback
import urllib.parse
import zlib
from collections import Counter, OrderedDict
from typing import Dict, List, Tuple, Union, Optional, Any, Iterator, Set, Callable
from functools import lru_cache
try:
import jwt # Optional JWT support
JWT_SUPPORT = True
except ImportError:
JWT_SUPPORT = False
try:
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from rich.progress import Progress, TaskID
from rich.prompt import Prompt, Confirm
from rich.tree import Tree
from rich.syntax import Syntax
from rich.markdown import Markdown
from rich import box
from rich.traceback import install as install_rich_traceback
install_rich_traceback(show_locals=True)
RICH_AVAILABLE = True
except ImportError:
RICH_AVAILABLE = False
# Fallback console class
class FallbackConsole:
def print(self, *args, **kwargs):
print(*args)
# Initialize console
if RICH_AVAILABLE:
console = Console()
else:
console = FallbackConsole()
# Constants
VERSION = "3.0.0"
MAX_BINARY_PREVIEW = 100
MAX_DISPLAY_LENGTH = 120
DEFAULT_MAX_DEPTH = 5
# Define encoding patterns with more comprehensive regex
ENCODING_PATTERNS = {
"base64_standard": re.compile(r'^[A-Za-z0-9+/]+={0,2}$'),
"base64_url_safe": re.compile(r'^[A-Za-z0-9_-]+={0,2}$'),
"base64_possible": re.compile(r'^[A-Za-z0-9+/_-]+$'), # Less strict for padding issues
"hex": re.compile(r'^[A-Fa-f0-9]+$'),
"url_encoded": re.compile(r'(%[0-9A-Fa-f]{2})+'),
"jwt": re.compile(r'^[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+$'),
"json": re.compile(r'^\s*({[\s\S]*}|\[[\s\S]*\])\s*$'),
"possible_json": re.compile(r'({"|\[{)'),
"integer": re.compile(r'^-?\d+$'),
"float": re.compile(r'^-?\d+\.\d+$'),
"uuid": re.compile(r'^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$', re.IGNORECASE),
"timestamp_unix": re.compile(r'^1\d{9}$'), # Unix timestamps from 2001 to 2286
"timestamp_millis": re.compile(r'^1\d{12}$'), # Unix timestamps in milliseconds
}
# Binary file signatures (magic numbers)
BINARY_SIGNATURES = {
b"\x1F\x8B\x08": "gzip",
b"\x42\x5A\x68": "bzip2",
b"\xFD\x37\x7A\x58\x5A\x00": "xz",
b"\x50\x4B\x03\x04": "zip",
b"\x50\x4B\x05\x06": "zip (empty)",
b"\x50\x4B\x07\x08": "zip (spanned)",
b"\x78\x9C": "zlib",
b"\xFF\xD8\xFF": "jpeg",
b"\x89\x50\x4E\x47\x0D\x0A\x1A\x0A": "png",
b"\x47\x49\x46\x38": "gif",
b"\x25\x50\x44\x46": "pdf",
b"\x00\x01\x00\x00\x00": "ttf",
b"\x4F\x54\x54\x4F": "otf",
b"\x00\x61\x73\x6D": "wasm",
}
# Known JSON structure templates to look for
JSON_TEMPLATES = [
{"iss", "sub", "aud", "exp", "iat", "nbf"}, # JWT claim
{"access_token", "token_type", "expires_in"}, # OAuth
{"id", "name", "email"}, # User data
{"error", "error_description"}, # Error response
{"status", "message", "data"}, # API response
{"type", "value"}, # Generic key-value
]
class JSONEnhancedDecoder(json.JSONDecoder):
"""Enhanced JSON decoder that supports various non-standard formats."""
def __init__(self, *args, **kwargs):
json.JSONDecoder.__init__(self, *args, **kwargs)
# Override the scan_once method
self.scan_once = self._scan_once
def _scan_once(self, string, idx):
try:
return json.JSONDecoder.scan_once(self, string, idx)
except json.JSONDecodeError:
# Try to be more lenient with trailing commas
# e.g. {"a": 1, "b": 2,}
if idx < len(string) and string[idx] == ',':
# Try to skip the comma and continue
next_char_idx = idx + 1
while next_char_idx < len(string) and string[next_char_idx].isspace():
next_char_idx += 1
if next_char_idx < len(string) and string[next_char_idx] in ']}':
# We have something like "...," followed by closing bracket
# Skip the comma and let the parent method handle the closing bracket
return json.JSONDecoder.scan_once(self, string, next_char_idx)
# If we can't handle it specially, re-raise the original error
raise
class EncodingAnalyzer:
"""Advanced encoding analyzer with brutal JSON parsing capabilities."""
def __init__(self, debug_mode=False):
self.results = []
self.debug_mode = debug_mode
self.visited_strings = set() # Prevent infinite recursion on same content
self.stringified_json_cache = {} # Cache for JSON stringification results
def debug(self, msg):
"""Print debug information if debug mode is enabled."""
if self.debug_mode:
if RICH_AVAILABLE:
console.print(f"[dim][DEBUG] {msg}[/dim]")
else:
print(f"[DEBUG] {msg}")
def is_binary_data(self, data: bytes) -> Tuple[bool, Optional[str]]:
"""
Check if data appears to be binary by examining content and signatures.
Returns (is_binary, format_if_identified)
"""
# Check if contains too many non-printable bytes
printable_ratio = sum(32 <= b <= 126 or b in (9, 10, 13) for b in data) / len(data) if data else 0
# Check common file signatures
for signature, format_name in BINARY_SIGNATURES.items():
if data.startswith(signature):
return True, format_name
# If less than 80% printable characters and not a known text format, consider it binary
if printable_ratio < 0.8:
# Try to detect encoding with chardet
try:
detected = chardet.detect(data)
if detected['confidence'] < 0.7:
return True, None
except Exception:
return True, None
return False, None
@lru_cache(maxsize=128)
def is_base64(self, s: str) -> bool:
"""Check if a string is valid Base64 with multiple heuristics."""
# Length checks
if len(s) < 4: # Too short to be meaningful Base64
return False
# Must be on 4-byte alignment with proper padding
padding_error = len(s) % 4
if padding_error:
# Try adding padding
s = s + '=' * (4 - padding_error)
# Check character set
if not ENCODING_PATTERNS["base64_possible"].match(s):
return False
# Check if the ratio of different characters matches expected for Base64
char_counts = Counter(s)
# Typical Base64 has good distribution; check if any char is too frequent
most_common_ratio = char_counts.most_common(1)[0][1] / len(s) if s else 0
if most_common_ratio > 0.5: # If any char is >50% of string, likely not Base64
return False
try:
# Try to decode (strict mode)
decoded = base64.b64decode(s, validate=True)
# Additional quality check - valid Base64 often decodes to meaningful data
is_bin, _ = self.is_binary_data(decoded)
# If it's very short binary data, be more skeptical
if is_bin and len(decoded) < 3:
return False
return True
except binascii.Error:
try:
# Try URL-safe variant
decoded = base64.urlsafe_b64decode(s)
# Same quality checks
is_bin, _ = self.is_binary_data(decoded)
if is_bin and len(decoded) < 3:
return False
return True
except binascii.Error:
return False
@lru_cache(maxsize=128)
def is_base64_url_safe(self, s: str) -> bool:
"""Check if a string is URL-safe Base64."""
# Base checks similar to is_base64
if len(s) < 4:
return False
# Must contain URL-safe specific characters
if '-' not in s and '_' not in s:
# Doesn't use URL-safe alphabet, unlikely to be URL-safe Base64
# Unless it's all alphanumeric which could still be URL-safe
if not all(c.isalnum() for c in s if c != '='):
return False
try:
# Handle padding issues
padding_error = len(s) % 4
if padding_error:
s = s + '=' * (4 - padding_error)
# Try to decode
decoded = base64.urlsafe_b64decode(s)
# Additional quality check
is_bin, _ = self.is_binary_data(decoded)
if is_bin and len(decoded) < 3:
return False
return True
except binascii.Error:
return False
@lru_cache(maxsize=128)
def is_hex(self, s: str) -> bool:
"""Check if a string is hex encoded with robust validation."""
# Basic pattern check
if not ENCODING_PATTERNS["hex"].match(s):
return False
# Must be even length for valid hex (each byte is 2 hex chars)
if len(s) % 2 != 0:
return False
# Very short hex strings are often false positives
if len(s) < 4:
return False
# Check if the entropy is reasonable for hex
if len(s) > 6: # Only check longer strings
char_counts = Counter(s.lower())
# All 16 hex chars should be reasonably distributed
unique_chars = len(char_counts)
if unique_chars < 6: # Too few unique characters
# Unless it's a specific hex pattern like 00FF00FF
if len(set(s[i:i+2] for i in range(0, len(s), 2))) <= 2:
# Repetitive pattern, might be valid hex
pass
else:
return False
try:
# Decode and validate the result
decoded = binascii.unhexlify(s)
# For very short results, apply additional heuristics
if len(decoded) < 3:
# Check if the decoded value makes sense
# If it's binary or unprintable, often a false positive
printable_count = sum(32 <= b <= 126 for b in decoded)
if printable_count == 0:
# All unprintable - check if it might be a common binary value
if all(b == 0 for b in decoded) or all(b == 255 for b in decoded):
# Common binary pattern (all zeros or all ones)
return True
if len(decoded) <= 1:
return False # Single byte unprintable - likely false positive
return True
except binascii.Error:
return False
@lru_cache(maxsize=128)
def is_url_encoded(self, s: str) -> bool:
"""Check if a string is URL encoded with enhanced validation."""
# Quick check for % character which is essential for URL encoding
if '%' not in s:
return False
# Check for valid percent-encoded sequences
if not re.search(r'%[0-9A-Fa-f]{2}', s):
return False
# Make sure the % is actually part of encoded data
# by checking if enough of the string is encoded
encoded_chars = len(re.findall(r'%[0-9A-Fa-f]{2}', s))
encoded_ratio = encoded_chars * 3 / len(s)
# If very low ratio of encoded characters, be skeptical
if encoded_ratio < 0.1 and encoded_chars < 2:
# Too few encoded characters, may be false positive
# Unless it's a common pattern like URL with spaces encoded
if re.search(r'%20|%2B|\+', s):
# Contains encoded space or plus, common in URLs
pass
else:
return False
try:
# Try to decode
decoded = urllib.parse.unquote(s)
# Additional validation: must actually change the string
if decoded == s:
return False
return True
except Exception:
return False
@lru_cache(maxsize=128)
def is_jwt(self, s: str) -> bool:
"""Check if a string is a JWT token with thorough validation."""
# Check basic pattern
if not ENCODING_PATTERNS["jwt"].match(s):
return False
# Split and analyze parts
parts = s.split('.')
if len(parts) != 3:
return False
# Each part should be valid Base64
for i, part in enumerate(parts[:2]): # Header and payload
padding_len = 4 - (len(part) % 4) if len(part) % 4 else 0
part_padded = part + ('=' * padding_len)
try:
# Decode the part
decoded = base64.urlsafe_b64decode(part_padded)
# Verify JSON format
try:
json_data = json.loads(decoded)
# Additional JWT-specific validation
if i == 0: # Header
if not isinstance(json_data, dict) or 'alg' not in json_data:
# JWT header must contain algorithm
return False
elif i == 1: # Payload
if not isinstance(json_data, dict):
return False
# Check for common JWT claims
jwt_claims = ['iss', 'sub', 'aud', 'exp', 'iat', 'nbf']
if not any(claim in json_data for claim in jwt_claims):
# No standard JWT claims found, be skeptical
# But allow if it has enough data to be plausible
if len(json_data) < 2:
return False
except json.JSONDecodeError:
return False
except binascii.Error:
return False
# If we have the PyJWT module, try to decode the token
if JWT_SUPPORT:
try:
# Only verify signature format, not validity
header = jwt.get_unverified_header(s)
if not header or 'alg' not in header:
return False
except Exception:
# PyJWT failed to parse, likely not a valid JWT
return False
return True
def is_gzip_compressed(self, data: bytes) -> bool:
"""Check if bytes are gzip compressed."""
# Check for gzip magic bytes
if len(data) < 10: # Minimum gzip header size
return False
# Check for gzip magic bytes (0x1F, 0x8B)
if data[0] == 0x1F and data[1] == 0x8B:
try:
# Try to decompress the first few bytes as a sanity check
with gzip.GzipFile(fileobj=io.BytesIO(data)) as f:
f.read(1) # Just read a byte to verify
return True
except Exception:
pass
return False
def is_zlib_compressed(self, data: bytes) -> bool:
"""Check if bytes are zlib compressed."""
# Check for zlib magic byte (0x78)
if len(data) < 6: # Too short for meaningful zlib data
return False
# zlib streams start with 0x78 and second byte is usually 0x01, 0x9C, or 0xDA
if data[0] == 0x78 and data[1] in (0x01, 0x9C, 0xDA, 0x5E):
try:
# Try decompressing with zlib
zlib.decompress(data)
return True
except zlib.error:
pass
return False
def is_bzip2_compressed(self, data: bytes) -> bool:
"""Check if bytes are bzip2 compressed."""
# Check for bzip2 magic bytes (BZh)
if len(data) < 10:
return False
# BZip2 signature
if data.startswith(b'BZh') and ord('1') <= data[3] <= ord('9'):
try:
# Try decompressing with bzip2
bz2.decompress(data)
return True
except Exception:
pass
return False
def is_lzma_compressed(self, data: bytes) -> bool:
"""Check if bytes are LZMA/XZ compressed."""
# Check for LZMA/XZ signature
if len(data) < 6:
return False
# XZ signature
if data.startswith(b'\xFD\x37\x7A\x58\x5A\x00'):
try:
# Try decompressing with lzma
lzma.decompress(data)
return True
except Exception:
pass
return False
def is_json_string(self, s: str) -> bool:
"""
Check if a string is valid JSON with advanced heuristics.
Args:
s: String to check
Returns:
bool: True if the string is valid JSON
"""
# Quick check for JSON-like structure
s = s.strip()
if not (s.startswith('{') and s.endswith('}')) and not (s.startswith('[') and s.endswith(']')):
return False
try:
# Try to parse the string as JSON
json.loads(s)
return True
except json.JSONDecodeError as e:
# If close to valid JSON, try aggressive recovery mode
if s.count('{') > 10 or s.count('[') > 10: # Looks complex enough to try harder
try:
# Try to fix common JSON issues
fixed_json = self._fix_common_json_errors(s)
if fixed_json != s: # Only if we actually fixed something
json.loads(fixed_json)
return True
except Exception:
pass
return False
def _fix_common_json_errors(self, s: str) -> str:
"""Try to fix common JSON syntax errors."""
# Replace JavaScript literal quotes with JSON compatible ones
s = re.sub(r'([{,]\s*)(\w+)(\s*:)', r'\1"\2"\3', s)
# Replace single quotes with double quotes (only if not inside strings)
in_string = False
in_double_quote = False
result = []
for char in s:
if char == '"' and not in_string:
in_double_quote = not in_double_quote
elif char == "'" and not in_double_quote:
result.append('"')
in_string = not in_string
continue
result.append(char)
s = ''.join(result)
# Remove trailing commas in arrays and objects
s = re.sub(r',(\s*[\]}])', r'\1', s)
# Fix unquoted boolean and null literals
s = re.sub(r':\s*true\b', r': true', s)
s = re.sub(r':\s*false\b', r': false', s)
s = re.sub(r':\s*null\b', r': null', s)
return s
def _find_possible_json(self, s: str) -> Optional[str]:
"""Try to find valid JSON fragments within a larger string."""
# Find all potential JSON objects or arrays
object_candidates = list(re.finditer(r'{(?:[^{}]|(?R))*}', s, re.DOTALL))
array_candidates = list(re.finditer(r'\[(?:[^\[\]]|(?R))*\]', s, re.DOTALL))
candidates = []
# Extract all potential JSON object candidates
depth = 0
start = None
for i, char in enumerate(s):
if char == '{':
if depth == 0:
start = i
depth += 1
elif char == '}':
depth -= 1
if depth == 0 and start is not None:
candidates.append(s[start:i+1])
start = None
# Extract all potential JSON array candidates
depth = 0
start = None
for i, char in enumerate(s):
if char == '[':
if depth == 0:
start = i
depth += 1
elif char == ']':
depth -= 1
if depth == 0 and start is not None:
candidates.append(s[start:i+1])
start = None
# Sort candidates by length (prefer longer matches)
candidates.sort(key=len, reverse=True)
# Try to parse each candidate
for candidate in candidates:
if len(candidate) >= 2: # Minimum valid JSON length
try:
json.loads(candidate)
return candidate
except json.JSONDecodeError:
# Try with enhanced recovery
try:
fixed = self._fix_common_json_errors(candidate)
json.loads(fixed)
return fixed
except json.JSONDecodeError:
continue
return None
def decode_base64(self, s: str) -> Tuple[bool, str, bytes]:
"""Decode Base64 string with smart padding handling."""
# Ensure proper padding
padding_error = len(s) % 4
if padding_error:
s_padded = s + '=' * (4 - padding_error)
else:
s_padded = s
try:
# Try standard Base64 first
decoded = base64.b64decode(s_padded)
return True, "base64_standard", decoded
except binascii.Error:
try:
# Try URL-safe Base64
decoded = base64.urlsafe_b64decode(s_padded)
return True, "base64_url_safe", decoded
except binascii.Error:
# Try one more time with no padding
# Some systems incorrectly strip all padding
try:
s_no_pad = s.rstrip('=')
padding_len = (4 - len(s_no_pad) % 4) % 4
s_repad = s_no_pad + '=' * padding_len
decoded = base64.b64decode(s_repad)
return True, "base64_standard_repadded", decoded
except binascii.Error:
try:
decoded = base64.urlsafe_b64decode(s_repad)
return True, "base64_url_safe_repadded", decoded
except binascii.Error:
pass
return False, "failed", b''
def decode_hex(self, s: str) -> Tuple[bool, bytes]:
"""Decode hex encoded string."""
try:
# Handle both uppercase and lowercase hex
decoded = binascii.unhexlify(s)
return True, decoded
except binascii.Error:
return False, b''
def decode_url_encoded(self, s: str) -> Tuple[bool, str]:
"""Decode URL encoded string."""
try:
# Standard URL decoding
decoded = urllib.parse.unquote(s)
# If no change, try more aggressive decoding
if decoded == s and '%' in s:
# Try with plus as space
decoded = urllib.parse.unquote_plus(s)
# If still no change and has pluses, try replacing + with space
if decoded == s and '+' in s:
decoded = s.replace('+', ' ')
return True, decoded
except Exception:
return False, ""
def decode_jwt(self, s: str) -> Tuple[bool, Dict]:
"""Decode JWT token to its components with enhanced error handling."""
parts = s.split('.')
try:
# Process header
header_pad = parts[0] + '=' * (4 - len(parts[0]) % 4 if len(parts[0]) % 4 else 0)
header_bytes = base64.urlsafe_b64decode(header_pad)
# Process payload
payload_pad = parts[1] + '=' * (4 - len(parts[1]) % 4 if len(parts[1]) % 4 else 0)
payload_bytes = base64.urlsafe_b64decode(payload_pad)
# Try parsing as JSON with enhanced parsing
try:
header = json.loads(header_bytes.decode('utf-8'))
except json.JSONDecodeError:
# Try with more lenient JSON parsing
try:
header_text = header_bytes.decode('utf-8')
fixed_header = self._fix_common_json_errors(header_text)
header = json.loads(fixed_header)
except Exception:
# If still fails, store raw decoded value
header = {"_raw_value": header_bytes.decode('utf-8', errors='replace')}
try:
payload = json.loads(payload_bytes.decode('utf-8'))
except json.JSONDecodeError:
# Try with more lenient JSON parsing
try:
payload_text = payload_bytes.decode('utf-8')
fixed_payload = self._fix_common_json_errors(payload_text)
payload = json.loads(fixed_payload)
except Exception:
# If still fails, store raw decoded value
payload = {"_raw_value": payload_bytes.decode('utf-8', errors='replace')}
# Calculate expiration time if available
expiration_info = None
if isinstance(payload, dict) and 'exp' in payload:
try:
exp_timestamp = int(payload['exp'])
expiration_date = datetime.datetime.fromtimestamp(exp_timestamp)
now = datetime.datetime.now()
if expiration_date > now:
status = "valid"
remaining = (expiration_date - now).total_seconds()
if remaining < 3600:
time_left = f"{int(remaining / 60)} minutes"
else:
time_left = f"{int(remaining / 3600)} hours"
else:
status = "expired"
time_ago = (now - expiration_date).total_seconds()
if time_ago < 3600:
time_left = f"{int(time_ago / 60)} minutes ago"
else:
time_left = f"{int(time_ago / 3600)} hours ago"
expiration_info = {
"status": status,
"expires_at": expiration_date.isoformat(),
"time_remaining": time_left
}
except (ValueError, TypeError, OverflowError):
pass
result = {
"header": header,
"payload": payload,
"signature": parts[2]
}
if expiration_info:
result["expiration"] = expiration_info
return True, result
except (binascii.Error, UnicodeDecodeError):
return False, {}
def decode_compressed(self, data: bytes) -> Tuple[bool, str, bytes]:
"""Try to decompress data using multiple compression algorithms."""
# Check data length
if len(data) < 4: # Too short to be compressed data
return False, "too_short", data
# Try gzip
if self.is_gzip_compressed(data):
try:
decompressed = gzip.decompress(data)
return True, "gzip", decompressed
except Exception:
pass
# Try zlib
if self.is_zlib_compressed(data):
try:
decompressed = zlib.decompress(data)
return True, "zlib", decompressed
except zlib.error:
pass
# Try bzip2
if self.is_bzip2_compressed(data):
try:
decompressed = bz2.decompress(data)
return True, "bzip2", decompressed
except Exception:
pass
# Try LZMA/XZ
if self.is_lzma_compressed(data):
try:
decompressed = lzma.decompress(data)
return True, "lzma", decompressed
except Exception:
pass
# If we got here, we couldn't decompress the data
return False, "not_compressed", data
def _analyze_json_structure(self, obj: Any) -> Dict:
"""
Analyze JSON object structure to extract metadata and identify patterns.
"""
result = {
"type": type(obj).__name__,
"schema": {},
"size": 0,
"patterns": []
}
if isinstance(obj, dict):
result["size"] = len(obj)
# Check for known JSON templates
obj_keys = set(obj.keys())
for template in JSON_TEMPLATES:
if template.issubset(obj_keys):
result["patterns"].append(f"Matches {list(template)} pattern")
# Analyze structure
result["schema"] = {k: type(v).__name__ for k, v in obj.items()}
# Check for JWT claims
jwt_claims = ['iss', 'sub', 'aud', 'exp', 'iat', 'nbf']
if any(claim in obj for claim in jwt_claims):
result["patterns"].append("Contains JWT claims")
# Check for auth tokens
auth_fields = ['token', 'access_token', 'id_token', 'refresh_token']
if any(field in obj for field in auth_fields):
result["patterns"].append("Contains auth tokens")
elif isinstance(obj, list):
result["size"] = len(obj)
if obj:
# Check if all items have the same structure
if all(isinstance(item, type(obj[0])) for item in obj):
result["patterns"].append("Homogeneous array")
# Sample the structure of the first item
if isinstance(obj[0], dict):
result["schema"] = {"sample_item": {k: type(v).__name__ for k, v in obj[0].items()}}
else:
result["schema"] = {"sample_item": type(obj[0]).__name__}
return result
def _attempt_json_recovery(self, s: str) -> Tuple[bool, Any]:
"""
Advanced JSON recovery for malformed JSON strings.
"""
# Stage 1: Try with minor fixes
try:
obj = json.loads(s)
return True, obj
except json.JSONDecodeError as e:
# Stage 2: Apply common fixes
try:
fixed = self._fix_common_json_errors(s)
if fixed != s: # Only if we made changes
obj = json.loads(fixed)
return True, obj
except json.JSONDecodeError:
pass
# Stage 3: Extract valid JSON fragments
possible_json = self._find_possible_json(s)
if possible_json:
try:
obj = json.loads(possible_json)
return True, obj
except json.JSONDecodeError:
pass
# Stage 4: Use enhanced JSON decoder
try:
obj = json.loads(s, cls=JSONEnhancedDecoder)
return True, obj
except json.JSONDecodeError:
pass
# Stage 5: Last resort, brute force recovery
# This is a very aggressive approach - only use for critical recovery
if len(s) > 10 and ("{" in s or "[" in s):
# Find all start positions for objects and arrays
object_starts = [m.start() for m in re.finditer('{', s)]
array_starts = [m.start() for m in re.finditer('\\[', s)]
# Find all end positions
object_ends = [m.start() for m in re.finditer('}', s)]
array_ends = [m.start() for m in re.finditer('\\]', s)]
# Try all combinations of starts and ends
for start_pos in sorted(object_starts + array_starts):
for end_pos in sorted(object_ends + array_ends):
if end_pos <= start_pos:
continue
# Extract substring and try to parse
if start_pos in object_starts and end_pos in object_ends:
substr = s[start_pos:end_pos+1]
try:
obj = json.loads(substr)
# Only accept if it has reasonable content
if isinstance(obj, dict) and len(obj) > 1:
return True, obj
except json.JSONDecodeError:
pass
if start_pos in array_starts and end_pos in array_ends:
substr = s[start_pos:end_pos+1]
try:
obj = json.loads(substr)
# Only accept if it has reasonable content
if isinstance(obj, list) and len(obj) > 1:
return True, obj
except json.JSONDecodeError:
pass
return False, None
def _extract_nested_json(self, text: str) -> List[Tuple[int, int, str]]:
"""
Extract all potential JSON objects or arrays from text.
Returns list of (start_index, end_index, json_string) tuples.
"""
results = []
# Track potential start positions
object_starts = [] # for {}
array_starts = [] # for []
for i, char in enumerate(text):
if char == '{':
object_starts.append(i)
elif char == '[':
array_starts.append(i)
elif char == '}' and object_starts:
start = object_starts.pop()
# If this is the outermost closing brace
if not object_starts:
json_str = text[start:i+1]
try:
# Quick validation
json.loads(json_str)
results.append((start, i+1, json_str))
except json.JSONDecodeError:
# Try aggressive recovery if it looks promising
if len(json_str) > 10:
success, _ = self._attempt_json_recovery(json_str)
if success:
results.append((start, i+1, json_str))
elif char == ']' and array_starts:
start = array_starts.pop()
# If this is the outermost closing bracket
if not array_starts:
json_str = text[start:i+1]
try:
# Quick validation
json.loads(json_str)
results.append((start, i+1, json_str))
except json.JSONDecodeError:
# Try aggressive recovery if it looks promising
if len(json_str) > 10:
success, _ = self._attempt_json_recovery(json_str)
if success:
results.append((start, i+1, json_str))
# Sort by length (prefer longer matches)
results.sort(key=lambda x: x[1] - x[0], reverse=True)
return results
def _generate_hash_info(self, data: bytes) -> Dict:
"""Generate hash information for binary data."""
return {
"md5": hashlib.md5(data).hexdigest(),
"sha1": hashlib.sha1(data).hexdigest(),
"sha256": hashlib.sha256(data).hexdigest()
}
def _format_binary_preview(self, data: bytes, max_bytes: int = MAX_BINARY_PREVIEW) -> str:
"""Format binary data for preview."""
if not data:
return "Empty data"
# Determine if it's viewable as text
try:
text = data.decode('utf-8')
if all(32 <= ord(c) <= 126 or c in '\n\r\t' for c in text):
# It's all printable ASCII
if len(text) > max_bytes:
return text[:max_bytes] + "... [truncated]"
return text
except UnicodeDecodeError:
pass
# Format as hex dump
result = []
bytes_per_line = 16
for i in range(0, min(len(data), max_bytes), bytes_per_line):
chunk = data[i:i+bytes_per_line]
hex_part = ' '.join(f'{b:02x}' for b in chunk)
# Add ASCII representation
ascii_part = ''.join(chr(b) if 32 <= b <= 126 else '.' for b in chunk)
result.append(f"{i:04x}: {hex_part.ljust(bytes_per_line*3-1)} {ascii_part}")
if len(data) > max_bytes:
result.append(f"... [truncated, {len(data) - max_bytes} more bytes]")
return '\n'.join(result)
def analyze_string(self, s: str, depth: int = 0, max_depth: int = DEFAULT_MAX_DEPTH, path: str = "root") -> Dict:
"""
Recursively analyze a string to identify and decode encodings with enhanced JSON handling.
Args:
s: The string to analyze
depth: Current recursion depth
max_depth: Maximum recursion depth
path: Path in the decode tree (for tracking recursion)
Returns:
Dict with analysis results
"""
if depth > max_depth:
return {
"original": s[:MAX_DISPLAY_LENGTH] + "..." if len(s) > MAX_DISPLAY_LENGTH else s,
"length": len(s),
"encoding": "max_depth_reached",
"decoded": None,
"nested": None,
"path": path
}
# Skip empty or very short strings
if not s or len(s) < 2:
return {
"original": s,
"length": len(s),
"encoding": "too_short",
"decoded": s,
"nested": None,
"path": path
}
# Check for infinite recursion or circular references
if s in self.visited_strings and depth > 0:
return {
"original": s[:MAX_DISPLAY_LENGTH] + "..." if len(s) > MAX_DISPLAY_LENGTH else s,
"length": len(s),
"encoding": "circular_reference",
"decoded": "Already processed this string",
"nested": None,
"path": path
}
# Add to visited strings for loop detection
if len(s) < 10000: # Only track shorter strings to avoid memory issues
self.visited_strings.add(s)
result = {
"original": s[:MAX_DISPLAY_LENGTH] + "..." if len(s) > MAX_DISPLAY_LENGTH else s,
"length": len(s),
"encoding": "unknown",
"decoded": None,
"nested": None,
"path": path,
"analysis": {}
}
# First, check if the string is already valid JSON
if s.strip().startswith(('{', '[')) and s.strip().endswith(('}', ']')):
success, json_obj = self._attempt_json_recovery(s)
if success:
result["encoding"] = "json"
result["decoded"] = json_obj
result["analysis"]["json_structure"] = self._analyze_json_structure(json_obj)
return result
# Check for JWT (which contains JSON within it)
if self.is_jwt(s):
success, decoded = self.decode_jwt(s)
if success:
result["encoding"] = "jwt"
result["decoded"] = decoded
return result
# Check for Base64
if self.is_base64(s) or self.is_base64_url_safe(s):
success, encoding_type, decoded_bytes = self.decode_base64(s)
if success:
result["encoding"] = encoding_type
# Check for compression
is_compressed, compression_type, decompressed_data = self.decode_compressed(decoded_bytes)
if is_compressed:
result["encoding"] = f"{encoding_type}_with_{compression_type}"
decoded_bytes = decompressed_data
# Try to interpret as text first
try:
text_decoded = decoded_bytes.decode('utf-8')
# Check if it's JSON
if text_decoded.strip().startswith(('{', '[')) and text_decoded.strip().endswith(('}', ']')):
success, json_obj = self._attempt_json_recovery(text_decoded)
if success:
result["encoding"] = f"{result['encoding']}_with_json"
result["decoded"] = json_obj
result["analysis"]["json_structure"] = self._analyze_json_structure(json_obj)
return result
# Store the text representation
result["decoded"] = text_decoded
# Recursively analyze the decoded text
if depth < max_depth and len(text_decoded) > 3:
result["nested"] = self.analyze_string(
text_decoded,
depth + 1,
max_depth,
f"{path}.{encoding_type}"
)
except UnicodeDecodeError:
# Not valid UTF-8, try to analyze the binary data
is_bin, format_name = self.is_binary_data(decoded_bytes)
if is_bin:
result["encoded_binary"] = True
result["binary_format"] = format_name if format_name else "Unknown binary"
result["hash"] = self._generate_hash_info(decoded_bytes)
result["binary_preview"] = self._format_binary_preview(decoded_bytes)
else:
# Not recognized binary, but also not text - keep the raw bytes
result["decoded"] = f"Binary data ({len(decoded_bytes)} bytes)"
result["binary_preview"] = self._format_binary_preview(decoded_bytes)
# Regardless of what happened above, store the actual bytes
# result["raw_bytes"] = decoded_bytes.hex()
# Check for Hex
elif self.is_hex(s):
success, decoded_bytes = self.decode_hex(s)
if success:
result["encoding"] = "hex"
# Try to interpret as text
try:
text_decoded = decoded_bytes.decode('utf-8')
# Check if the text is JSON
if text_decoded.strip().startswith(('{', '[')) and text_decoded.strip().endswith(('}', ']')):
success, json_obj = self._attempt_json_recovery(text_decoded)
if success:
result["encoding"] = "hex_with_json"
result["decoded"] = json_obj
result["analysis"]["json_structure"] = self._analyze_json_structure(json_obj)
return result
# Otherwise store the text
result["decoded"] = text_decoded
# Recursively analyze
if depth < max_depth and len(text_decoded) > 3:
result["nested"] = self.analyze_string(
text_decoded,
depth + 1,
max_depth,
f"{path}.hex"
)
except UnicodeDecodeError:
# Check for binary data patterns
is_bin, format_name = self.is_binary_data(decoded_bytes)
if is_bin:
result["encoded_binary"] = True
result["binary_format"] = format_name if format_name else "Unknown binary"
result["hash"] = self._generate_hash_info(decoded_bytes)
result["binary_preview"] = self._format_binary_preview(decoded_bytes)
else:
result["decoded"] = f"Binary data ({len(decoded_bytes)} bytes)"
result["binary_preview"] = self._format_binary_preview(decoded_bytes)
# Regardless of what happened above, store the actual bytes
# result["raw_bytes"] = decoded_bytes.hex()
# Check for URL encoding
elif self.is_url_encoded(s):
success, decoded = self.decode_url_encoded(s)
if success:
result["encoding"] = "url_encoded"
result["decoded"] = decoded
# Recursively analyze
if depth < max_depth and decoded != s:
result["nested"] = self.analyze_string(
decoded,
depth + 1,
max_depth,
f"{path}.url_encoded"
)
# Check for embedded JSON in larger strings
elif len(s) > 20 and ('{' in s or '[' in s):
json_matches = self._extract_nested_json(s)
if json_matches:
# Use the longest match that we found
start, end, json_str = json_matches[0]
# Try to parse it
success, json_obj = self._attempt_json_recovery(json_str)
if success:
result["encoding"] = "embedded_json"
result["json_position"] = {"start": start, "end": end}
result["decoded"] = json_obj
result["analysis"]["json_structure"] = self._analyze_json_structure(json_obj)
# If there's content before/after the JSON, note it
if start > 0:
result["prefix"] = s[:start]
if end < len(s):
result["suffix"] = s[end:]
return result
# Check for integer/timestamp patterns
if ENCODING_PATTERNS["integer"].match(s):
try:
int_value = int(s)
result["analysis"]["integer_value"] = int_value
# Check if it's a timestamp
if ENCODING_PATTERNS["timestamp_unix"].match(s):
try:
timestamp = datetime.datetime.fromtimestamp(int_value)
if 2000 < timestamp.year < 2100: # Reasonable date range
result["analysis"]["possible_timestamp"] = timestamp.isoformat()
except (ValueError, OverflowError):
pass
except ValueError:
pass
return result
def _smart_stringify_json(self, json_obj: Any, indent: int = 2) -> str:
"""Create a "smart" string representation of JSON objects."""
# Use caching to prevent redundant work for large objects
cache_key = id(json_obj)
if cache_key in self.stringified_json_cache:
return self.stringified_json_cache[cache_key]
try:
# Try regular JSON serialization first
json_str = json.dumps(json_obj, indent=indent)
self.stringified_json_cache[cache_key] = json_str
return json_str
except (TypeError, OverflowError):
# Custom serialization for objects that aren't JSON serializable
if isinstance(json_obj, dict):
result = "{\n"
for k, v in json_obj.items():
try:
k_str = json.dumps(k)
except (TypeError, OverflowError):
k_str = f'"{str(k)}"'
try:
v_str = self._smart_stringify_json(v, indent)
result += " " * indent + f"{k_str}: {v_str},\n"
except Exception:
result += " " * indent + f"{k_str}: \"<cannot serialize>\",\n"
result = result.rstrip(",\n") + "\n}"
self.stringified_json_cache[cache_key] = result
return result
elif isinstance(json_obj, list):
result = "[\n"
for item in json_obj:
try:
item_str = self._smart_stringify_json(item, indent)
result += " " * indent + f"{item_str},\n"
except Exception:
result += " " * indent + "\"<cannot serialize>\",\n"
result = result.rstrip(",\n") + "\n]"
self.stringified_json_cache[cache_key] = result
return result
else:
# For other types, use str() representation
self.stringified_json_cache[cache_key] = f'"{str(json_obj)}"'
return self.stringified_json_cache[cache_key]
def pretty_print_result(self, result: Dict, indent: int = 0) -> None:
"""Render analysis results in a pretty format using rich."""
if not RICH_AVAILABLE:
print(json.dumps(result, indent=2, default=str))
return
indent_str = " " * indent
# Create a tree view for the result
tree = Tree(f"[bold cyan]String Analysis[/bold cyan]")
# Original string node
original_str = result["original"]
display_str = original_str if len(original_str) <= MAX_DISPLAY_LENGTH else original_str[:MAX_DISPLAY_LENGTH] + "..."
original_node = tree.add(f"[bold]Original[/bold]: [yellow]{display_str}[/yellow]")
original_node.add(f"Length: {result['length']} characters")
# Encoding type
encoding_color = "green" if result["encoding"] not in ["unknown", "too_short", "max_depth_reached", "circular_reference"] else "red"
encoding_node = tree.add(f"[bold]Encoding[/bold]: [{encoding_color}]{result['encoding']}[/{encoding_color}]")
# Path information if available
if "path" in result and result["path"] != "root":
encoding_node.add(f"Path: [blue]{result['path']}[/blue]")
# If binary data is present
if "binary_preview" in result:
binary_node = tree.add("[bold]Binary Content[/bold]")
if "binary_format" in result:
binary_node.add(f"Format: [magenta]{result['binary_format']}[/magenta]")
if "hash" in result:
hash_node = binary_node.add("[bold]Hash Information[/bold]")
for hash_algo, hash_value in result["hash"].items():
hash_node.add(f"{hash_algo.upper()}: [dim]{hash_value}[/dim]")
# Add binary preview in a separate panel for better visibility
console.print(Panel(
result["binary_preview"],
title="[bold]Binary Preview[/bold]",
border_style="blue",
expand=False
))
# Decoded content
if result["decoded"] is not None:
if isinstance(result["decoded"], dict) or isinstance(result["decoded"], list):
# For JSON or structured data
json_str = self._smart_stringify_json(result["decoded"])
syntax = Syntax(json_str, "json", theme="monokai", line_numbers=True)
# Show JSON structure analysis if available
if "analysis" in result and "json_structure" in result["analysis"]:
structure = result["analysis"]["json_structure"]
json_node = tree.add("[bold]JSON Structure Analysis[/bold]")
if "type" in structure:
json_node.add(f"Type: [blue]{structure['type']}[/blue]")
if "size" in structure:
json_node.add(f"Size: [blue]{structure['size']} items[/blue]")
if "patterns" in structure and structure["patterns"]:
patterns_node = json_node.add("[bold]Detected Patterns[/bold]")
for pattern in structure["patterns"]:
patterns_node.add(f"[green]✓[/green] {pattern}")
# Show the actual JSON content
console.print(Panel(syntax, title="[bold]Decoded JSON Content[/bold]", border_style="green", expand=False))
else:
# For string content
decoded_text = str(result["decoded"])
# Check if we can render it as JSON despite it not being parsed as such
if decoded_text.strip().startswith(('{', '[')) and decoded_text.strip().endswith(('}', ']')):
try:
# Try to parse and re-format as pretty JSON for display
json_obj = json.loads(decoded_text)
json_str = json.dumps(json_obj, indent=2)
syntax = Syntax(json_str, "json", theme="monokai", line_numbers=True)
console.print(Panel(syntax, title="[bold]Decoded (JSON)[/bold]", border_style="green", expand=False))
except json.JSONDecodeError:
# Not valid JSON, display as regular text
if len(decoded_text) > MAX_DISPLAY_LENGTH:
display_text = decoded_text[:MAX_DISPLAY_LENGTH] + "..."
decoded_node = tree.add(f"[bold]Decoded[/bold]: [green]{display_text}[/green]")
decoded_node.add("[dim](truncated, full content available in JSON output)[/dim]")
else:
tree.add(f"[bold]Decoded[/bold]: [green]{decoded_text}[/green]")
else:
# Regular text content
if len(decoded_text) > MAX_DISPLAY_LENGTH:
display_text = decoded_text[:MAX_DISPLAY_LENGTH] + "..."
decoded_node = tree.add(f"[bold]Decoded[/bold]: [green]{display_text}[/green]")
decoded_node.add("[dim](truncated, full content available in JSON output)[/dim]")
else:
tree.add(f"[bold]Decoded[/bold]: [green]{decoded_text}[/green]")
# Additional analysis information
if "analysis" in result and result["analysis"] and "json_structure" not in result["analysis"]:
analysis_node = tree.add("[bold]Additional Analysis[/bold]")
for k, v in result["analysis"].items():
if k != "json_structure": # Already handled above
analysis_node.add(f"[blue]{k}[/blue]: {v}")
# Prefix/Suffix for embedded JSON
if "prefix" in result:
prefix_text = result["prefix"]
if len(prefix_text) > 30:
prefix_text = prefix_text[:27] + "..."
tree.add(f"[bold]Content before JSON[/bold]: [yellow]{prefix_text}[/yellow]")
if "suffix" in result:
suffix_text = result["suffix"]
if len(suffix_text) > 30:
suffix_text = suffix_text[:27] + "..."
tree.add(f"[bold]Content after JSON[/bold]: [yellow]{suffix_text}[/yellow]")
# Nested results (recursive decoding)
if result["nested"] is not None:
nested_node = tree.add("[bold magenta]Nested Encoding Detected[/bold magenta]")
if isinstance(result["nested"], dict):
# Create a subtree for the nested result
self._add_nested_to_tree(result["nested"], nested_node)
console.print(tree)
def _add_nested_to_tree(self, result: Dict, parent_node: Tree) -> None:
"""Add nested encoding results to the tree."""
# Encoding type
encoding_color = "green" if result["encoding"] not in ["unknown", "too_short", "max_depth_reached", "circular_reference"] else "red"
encoding_node = parent_node.add(f"[bold]Encoding[/bold]: [{encoding_color}]{result['encoding']}[/{encoding_color}]")
# Path information if available
if "path" in result and result["path"] != "root":
encoding_node.add(f"Path: [blue]{result['path']}[/blue]")
# Binary content
if "binary_preview" in result:
binary_node = parent_node.add("[bold]Binary Content[/bold]")
if "binary_format" in result:
binary_node.add(f"Format: [magenta]{result['binary_format']}[/magenta]")
if "hash" in result:
hash_node = binary_node.add("[bold]Hash Information[/bold]")
for hash_algo, hash_value in result["hash"].items():
hash_node.add(f"{hash_algo.upper()}: [dim]{hash_value}[/dim]")
binary_node.add(f"[dim](See main output for binary preview)[/dim]")
# Decoded content
if result["decoded"] is not None:
if isinstance(result["decoded"], dict) or isinstance(result["decoded"], list):
# For structured data, summarize rather than showing full content
structure_node = parent_node.add("[bold]Structured Data[/bold]")
if isinstance(result["decoded"], dict):
structure_node.add(f"Dictionary with {len(result['decoded'])} keys")
# Show a few keys as samples
keys_sample = list(result["decoded"].keys())[:3]
if keys_sample:
structure_node.add(f"Sample keys: {', '.join(str(k) for k in keys_sample)}")
elif isinstance(result["decoded"], list):
structure_node.add(f"List with {len(result['decoded'])} items")
# Show the type of the first item
if result["decoded"]:
structure_node.add(f"First item type: {type(result['decoded'][0]).__name__}")
else:
# For string content
decoded_text = str(result["decoded"])
if len(decoded_text) > 70:
display_text = decoded_text[:70] + "..."
parent_node.add(f"[bold]Decoded[/bold]: [green]{display_text}[/green] [dim](truncated)[/dim]")
else:
parent_node.add(f"[bold]Decoded[/bold]: [green]{decoded_text}[/green]")
# Additional analysis information
if "analysis" in result and result["analysis"]:
analysis_node = parent_node.add("[bold]Additional Analysis[/bold]")
for k, v in result["analysis"].items():
if isinstance(v, dict):
# Summarize nested dictionaries
sub_node = analysis_node.add(f"[blue]{k}[/blue]:")
for sk, sv in list(v.items())[:3]: # Show just a few items
sub_node.add(f"[dim]{sk}[/dim]: {sv}")
if len(v) > 3:
sub_node.add(f"[dim]...and {len(v) - 3} more items[/dim]")
else:
analysis_node.add(f"[blue]{k}[/blue]: {v}")
# Recursively add nested items
if result["nested"] is not None:
nested_node = parent_node.add("[bold magenta]Further Nested Encoding[/bold magenta]")
self._add_nested_to_tree(result["nested"], nested_node)
def json_output(self, result: Dict) -> str:
"""Convert the analysis result to a pretty JSON string with enhanced handling."""
# Create a clean copy to modify for serialization
clean_result = {}
def clean_for_json(obj):
if isinstance(obj, dict):
return {k: clean_for_json(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [clean_for_json(item) for item in obj]
elif isinstance(obj, (str, int, float, bool, type(None))):
return obj
else:
# Convert non-serializable types to strings
return str(obj)
# Clean the result for JSON serialization
clean_result = clean_for_json(result)
try:
return json.dumps(clean_result, indent=2)
except Exception as e:
# Fallback for any JSON serialization errors
self.debug(f"JSON serialization error: {str(e)}")
simplified = {"error": f"Failed to serialize result: {str(e)}"}
return json.dumps(simplified, indent=2)
def parse_cookie_string(self, cookie_str: str) -> List[Dict]:
"""
Parse a cookie string into individual components with attributes.
Args:
cookie_str: HTTP cookie header string
Returns:
List of dictionaries with cookie information
"""
cookies = []
# Split by semicolon and trim whitespace
parts = [p.strip() for p in cookie_str.split(';')]
current_cookie = {}
for i, part in enumerate(parts):
if '=' in part:
name, value = part.split('=', 1)
name = name.strip()
value = value.strip()
if i == 0 or (name.lower() in ['domain', 'path', 'expires', 'max-age', 'secure', 'httponly', 'samesite']):
# This is an attribute for the current cookie
if i == 0:
# First item is the main cookie
current_cookie = {'name': name, 'value': value}
cookies.append(current_cookie)
else:
# This is an attribute for the current cookie
attr_name = name.lower()
current_cookie[attr_name] = value
else:
# This is a new cookie
current_cookie = {'name': name, 'value': value}
cookies.append(current_cookie)
elif part.lower() in ['secure', 'httponly']:
# Flag attributes without values
if current_cookie:
current_cookie[part.lower()] = True
return cookies
def batch_analyze(self, input_str: str, max_depth: int = DEFAULT_MAX_DEPTH, progress: Optional[Progress] = None) -> Dict:
"""
Analyze a batch of strings with enhanced cookie handling and structure detection.
Args:
input_str: Input string that may contain multiple items to analyze
max_depth: Maximum recursion depth for nested encodings
progress: Optional progress bar for tracking
Returns:
Dictionary with analysis results and metadata
"""
# Reset visited strings for a new batch
self.visited_strings = set()
# Detect input type and parse accordingly
input_type = "unknown"
parts = []
# Check if input looks like HTTP cookies
if ';' in input_str and '=' in input_str and not input_str.strip().startswith('{'):
# Parse as cookies
cookies = self.parse_cookie_string(input_str)
if cookies:
input_type = "cookies"
parts = cookies
elif '\n' in input_str:
# Multiple lines, process each line separately
lines = [line.strip() for line in input_str.split('\n')]
lines = [line for line in lines if line] # Remove empty lines
if all('=' in line for line in lines[:5] if line):
# Looks like key=value pairs
input_type = "key_value_pairs"
parts = []
for line in lines:
if '=' in line:
key, value = line.split('=', 1)
parts.append({'name': key.strip(), 'value': value.strip()})
else:
parts.append({'value': line})
else:
# Just lines of text
input_type = "lines"
parts = [{'value': line} for line in lines]
else:
# Check if it's JSON
try:
json_data = json.loads(input_str)
input_type = "json"
# Extract items to analyze based on JSON structure
if isinstance(json_data, list):
if all(isinstance(item, str) for item in json_data):
# List of strings
parts = [{'value': item} for item in json_data]
elif all(isinstance(item, dict) for item in json_data):
# List of objects
parts = []
for item in json_data:
if 'name' in item and 'value' in item:
parts.append({'name': item['name'], 'value': item['value']})
elif 'key' in item and 'value' in item:
parts.append({'name': item['key'], 'value': item['value']})
else:
# Use the first string field as value
for k, v in item.items():
if isinstance(v, str):
parts.append({'name': k, 'value': v})
break
elif isinstance(json_data, dict):
# Dictionary - analyze each string value
parts = []
for key, value in json_data.items():
if isinstance(value, str):
parts.append({'name': key, 'value': value})
except json.JSONDecodeError:
# Not JSON, split by spaces or commas if nothing else matches
if ',' in input_str:
input_type = "comma_separated"
parts = [{'value': part.strip()} for part in input_str.split(',') if part.strip()]
else:
input_type = "space_separated"
parts = [{'value': part.strip()} for part in input_str.split() if part.strip()]
# If we still have no parts, treat the whole input as one string
if not parts:
input_type = "single_string"
parts = [{'value': input_str}]
# Now analyze each part
results = []
task_id = None
if progress:
task_id = progress.add_task("[cyan]Analyzing...", total=len(parts))
for i, part in enumerate(parts):
# Extract the value to analyze
if isinstance(part, dict) and 'value' in part:
value = part['value']
name = part.get('name', f"Item {i+1}")
elif isinstance(part, str):
value = part
name = f"Item {i+1}"
else:
value = str(part)
name = f"Item {i+1}"
# Create result structure
result_item = {
"index": i + 1,
"name": name,
"original_value": value,
"analysis": self.analyze_string(value, max_depth=max_depth),
"timestamp": datetime.datetime.now().isoformat()
}
# For cookies, include attributes
if input_type == "cookies" and isinstance(part, dict):
for attr, attr_value in part.items():
if attr not in ('name', 'value'):
result_item[attr] = attr_value
results.append(result_item)
if progress and task_id:
progress.update(task_id, advance=1)
# Summarize the results
successful_decodings = sum(1 for item in results if item["analysis"]["encoding"] not in ["unknown", "too_short"])
encoding_types = Counter(item["analysis"]["encoding"] for item in results)
most_common_encoding = encoding_types.most_common(1)[0][0] if encoding_types else "unknown"
# Add metadata to the results
final_output = {
"meta": {
"total_items": len(results),
"successful_decodings": successful_decodings,
"input_type": input_type,
"most_common_encoding": most_common_encoding,
"encoding_distribution": dict(encoding_types),
"analysis_time": datetime.datetime.now().isoformat(),
"max_recursion_depth": max_depth,
"version": VERSION
},
"results": results
}
return final_output
def main():
if RICH_AVAILABLE:
console.print(Panel.fit(
f"[bold cyan]Advanced Encoding Analyzer v{VERSION}[/bold cyan]\n"
"[dim]Enterprise-grade tool for decoding and analyzing encoded strings[/dim]",
border_style="blue"
))
else:
print(f"Advanced Encoding Analyzer v{VERSION}")
print("Enterprise-grade tool for decoding and analyzing encoded strings")
print("-" * 60)
# Create output directory if it doesn't exist
output_dir = os.path.dirname(os.path.realpath("./output/"))
if not os.path.exists(output_dir):
try:
os.makedirs(output_dir)
except Exception as e:
if RICH_AVAILABLE:
console.print(f"[bold red]Warning: Could not create output directory: {str(e)}[/bold red]")
else:
print(f"Warning: Could not create output directory: {str(e)}")
# Parse command line arguments if any
parser = argparse.ArgumentParser(description='Advanced Encoding Analyzer')
parser.add_argument('--file', '-f', help='Input file to analyze')
parser.add_argument('--output', '-o', help='Output file for results', default='./output/analysis.json')
parser.add_argument('--depth', '-d', type=int, help='Maximum recursion depth', default=DEFAULT_MAX_DEPTH)
parser.add_argument('--batch', '-b', action='store_true', help='Force batch mode')
parser.add_argument('--debug', action='store_true', help='Enable debug output')
parser.add_argument('--string', '-s', help='String to analyze directly from command line')
args = parser.parse_args()
analyzer = EncodingAnalyzer(debug_mode=args.debug)
# Handle direct command line input
if args.string:
if args.batch:
# Batch analyze the string
with Progress() as progress:
results = analyzer.batch_analyze(args.string, max_depth=args.depth, progress=progress)
# Save results
try:
with open(args.output, 'w') as f:
json.dump(results, f, indent=2, default=str)
if RICH_AVAILABLE:
console.print(f"[bold green]Results saved to {args.output}[/bold green]")
else:
print(f"Results saved to {args.output}")
except Exception as e:
if RICH_AVAILABLE:
console.print(f"[bold red]Error saving results: {str(e)}[/bold red]")
else:
print(f"Error saving results: {str(e)}")
# Display summary
if RICH_AVAILABLE:
table = Table(title="Analysis Summary", show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Item", style="cyan")
table.add_column("Value", style="green")
table.add_row("Total Items", str(results["meta"]["total_items"]))
table.add_row("Successful Decodings", str(results["meta"]["successful_decodings"]))
table.add_row("Input Type", results["meta"]["input_type"])
table.add_row("Most Common Encoding", results["meta"]["most_common_encoding"])
console.print(table)
else:
print("\nAnalysis Summary:")
print(f"Total Items: {results['meta']['total_items']}")
print(f"Successful Decodings: {results['meta']['successful_decodings']}")
print(f"Input Type: {results['meta']['input_type']}")
print(f"Most Common Encoding: {results['meta']['most_common_encoding']}")
else:
# Single string analysis
result = analyzer.analyze_string(args.string, max_depth=args.depth)
if RICH_AVAILABLE:
analyzer.pretty_print_result(result)
else:
print(json.dumps(result, indent=2, default=str))
# Save result
try:
with open(args.output, 'w') as f:
f.write(analyzer.json_output(result))
if RICH_AVAILABLE:
console.print(f"[bold green]Result saved to {args.output}[/bold green]")
else:
print(f"Result saved to {args.output}")
except Exception as e:
if RICH_AVAILABLE:
console.print(f"[bold red]Error saving result: {str(e)}[/bold red]")
else:
print(f"Error saving result: {str(e)}")
return
# Handle file input
if args.file:
try:
with open(args.file, 'r') as f:
input_str = f.read()
if args.batch:
# Batch analyze the file content
with Progress() as progress:
results = analyzer.batch_analyze(input_str, max_depth=args.depth, progress=progress)
# Save results
try:
with open(args.output, 'w') as f:
json.dump(results, f, indent=2, default=str)
if RICH_AVAILABLE:
console.print(f"[bold green]Results saved to {args.output}[/bold green]")
else:
print(f"Results saved to {args.output}")
except Exception as e:
if RICH_AVAILABLE:
console.print(f"[bold red]Error saving results: {str(e)}[/bold red]")
else:
print(f"Error saving results: {str(e)}")
# Display summary
if RICH_AVAILABLE:
table = Table(title="Analysis Summary", show_header=True, header_style="bold magenta", box=box.ROUNDED)
table.add_column("Item", style="cyan")
table.add_column("Value", style="green")
table.add_row("Total Items", str(results["meta"]["total_items"]))
table.add_row("Successful Decodings", str(results["meta"]["successful_decodings"]))
table.add_row("Input Type", results["meta"]["input_type"])
table.add_row("Most Common Encoding", results["meta"]["most_common_encoding"])
console.print(table)
else:
print("\nAnalysis Summary:")
print(f"Total Items: {results['meta']['total_items']}")
print(f"Successful Decodings: {results['meta']['successful_decodings']}")
print(f"Input Type: {results['meta']['input_type']}")
print(f"Most Common Encoding: {results['meta']['most_common_encoding']}")
else:
# Single file analysis
result = analyzer.analyze_string(input_str, max_depth=args.depth)
if RICH_AVAILABLE:
analyzer.pretty_print_result(result)
else:
print(json.dumps(result, indent=2, default=str))
# Save result
try:
with open(args.output, 'w') as f:
f.write(analyzer.json_output(result))
if RICH_AVAILABLE:
console.print(f"[bold green]Result saved to {args.output}[/bold green]")
else:
print(f"Result saved to {args.output}")
except Exception as e:
if RICH_AVAILABLE:
console.print(f"[bold red]Error saving result: {str(e)}[/bold red]")
else:
print(f"Error saving result: {str(e)}")
return
except Exception as e:
if RICH_AVAILABLE:
console.print(f"[bold red]Error reading file: {str(e)}[/bold red]")
else:
print(f"Error reading file: {str(e)}")
# Interactive mode
if not RICH_AVAILABLE:
print("Rich library not available, limited functionality in interactive mode.")
print("Please install rich for a better experience: pip install rich")
mode = input("Select analysis mode (single/batch): ").strip().lower()
if mode == "single":
input_str = input("Enter the string to analyze: ")
print("\nAnalyzing string...")
result = analyzer.analyze_string(input_str, max_depth=args.depth)
print(json.dumps(result, indent=2, default=str))
# Ask if user wants to save to file
save = input("Save to file? (y/n): ").strip().lower()
if save == 'y':
output_file = input(f"Enter output filename [{args.output}]: ").strip()
if not output_file:
output_file = args.output
try:
with open(output_file, "w") as f:
f.write(analyzer.json_output(result))
print(f"Result saved to {output_file}")
except Exception as e:
print(f"Error saving to file: {str(e)}")
else:
input_str = input("Enter the batch string to analyze: ")
max_depth = input(f"Set maximum recursion depth [{args.depth}]: ").strip()
if max_depth:
try:
max_depth = int(max_depth)
except ValueError:
max_depth = args.depth
else:
max_depth = args.depth
print("\nAnalyzing batch input...")
results = analyzer.batch_analyze(input_str, max_depth=max_depth)
# Save JSON output
output_file = input(f"Enter output filename [{args.output}]: ").strip()
if not output_file:
output_file = args.output
try:
with open(output_file, "w") as f:
json.dump(results, f, indent=2, default=str)
print(f"Results saved to {output_file}")
except Exception as e:
print(f"Error saving to file: {str(e)}")
# Display summary
print("\nAnalysis Summary:")
print(f"Total Items: {results['meta']['total_items']}")
print(f"Successful Decodings: {results['meta']['successful_decodings']}")
print(f"Input Type: {results['meta']['input_type']}")
print(f"Most Common Encoding: {results['meta']['most_common_encoding']}")
return
# Rich interactive mode
mode = Prompt.ask(
"Select analysis mode",
choices=["single", "batch"],
default="single"
)
if mode == "single":
input_str = Prompt.ask("\nEnter the string to analyze")
console.print("\n[bold]Analyzing string...[/bold]")
result = analyzer.analyze_string(input_str, max_depth=args.depth)
analyzer.pretty_print_result(result)
# Ask if user wants JSON output
if Confirm.ask("Generate JSON output file?"):
output_file = Prompt.ask("Enter output filename", default=args.output)
try:
with open(output_file, "w") as f:
f.write(analyzer.json_output(result))
console.print(f"[bold green]✓ Result saved to {output_file}[/bold green]")
except Exception as e:
console.print(f"[bold red]Error saving output: {str(e)}[/bold red]")
else: # Batch mode
input_str = Prompt.ask("\nEnter the batch string to analyze (cookies, tokens, JSON, etc.)")
max_depth = Prompt.ask(
"Set maximum recursion depth for nested encodings",
default=str(args.depth)
)
try:
max_depth = int(max_depth)
except ValueError:
max_depth = args.depth
console.print("\n[bold]Analyzing batch input...[/bold]")
console.print(f"[dim]Maximum recursion depth: {max_depth}[/dim]")
output_file = Prompt.ask("Enter output filename", default=args.output)
console.print(f"[dim]JSON output will be saved to: {output_file}[/dim]")
with Progress() as progress:
results = analyzer.batch_analyze(input_str, max_depth=max_depth, progress=progress)
# Export detailed JSON output
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
try:
with open(output_file, "w") as f:
json.dump(results, f, indent=2, default=str)
console.print(f"[bold green]✓ Detailed JSON analysis exported to {output_file}[/bold green]")
except Exception as e:
console.print(f"[bold red]Error saving JSON output: {str(e)}[/bold red]")
# Display summary table
table = Table(title="Analysis Results", show_header=True, header_style="bold cyan", box=box.ROUNDED)
table.add_column("#", style="dim", width=4)
table.add_column("Name/Key", style="dim")
table.add_column("Original (truncated)", style="yellow")
table.add_column("Encoding", style="green")
table.add_column("Decoded (preview)", style="cyan")
for i, result_item in enumerate(results["results"]):
# Add row for each result
row_index = str(i+1)
name = result_item["name"]
original = result_item["original_value"]
if len(original) > 30:
original = original[:27] + "..."
encoding = result_item["analysis"]["encoding"]
decoded = "N/A"
if result_item["analysis"]["decoded"] is not None:
if isinstance(result_item["analysis"]["decoded"], (dict, list)):
decoded = f"Complex structure ({type(result_item['analysis']['decoded']).__name__})"
else:
decoded = str(result_item["analysis"]["decoded"])
if len(decoded) > 30:
decoded = decoded[:27] + "..."
table.add_row(row_index, name, original, encoding, decoded)
console.print(table)
# Display summary information
summary_table = Table(title="Analysis Summary", box=box.ROUNDED)
summary_table.add_column("Metric", style="cyan")
summary_table.add_column("Value", style="green")
summary_table.add_row("Total Items", str(results["meta"]["total_items"]))
summary_table.add_row("Successful Decodings", str(results["meta"]["successful_decodings"]))
summary_table.add_row("Success Rate", f"{results['meta']['successful_decodings'] / results['meta']['total_items'] * 100:.1f}%")
summary_table.add_row("Input Type", results["meta"]["input_type"])
summary_table.add_row("Most Common Encoding", results["meta"]["most_common_encoding"])
# Add encoding distribution
distribution_table = Table(box=None, show_header=True, header_style="bold blue")
distribution_table.add_column("Encoding Type")
distribution_table.add_column("Count", justify="right")
distribution_table.add_column("Percentage", justify="right")
for encoding, count in sorted(results["meta"]["encoding_distribution"].items(), key=lambda x: x[1], reverse=True):
percentage = f"{count / results['meta']['total_items'] * 100:.1f}%"
distribution_table.add_row(encoding, str(count), percentage)
console.print(summary_table)
console.print(Panel(distribution_table, title="Encoding Distribution", border_style="blue"))
# Ask if user wants detailed analysis of a specific item
if Confirm.ask("\nShow detailed analysis for a specific item?"):
item_index = Prompt.ask(
"Enter the item number (1-based index)",
default="1"
)
try:
idx = int(item_index) - 1
if 0 <= idx < len(results["results"]):
console.print("\n[bold]Detailed Analysis for Item #{}:[/bold]".format(idx+1))
analyzer.pretty_print_result(results["results"][idx]["analysis"])
else:
console.print("[bold red]Invalid item index![/bold red]")
except ValueError:
console.print("[bold red]Invalid input. Expected a number.[/bold red]")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
if RICH_AVAILABLE:
console.print("\n[bold red]Cancelled by user[/bold red]")
else:
print("\nCancelled by user")
except Exception as e:
if RICH_AVAILABLE:
console.print(f"[bold red]Error: {str(e)}[/bold red]")
console.print_exception()
else:
print(f"Error: {str(e)}")
traceback.print_exc()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment