Skip to content

Instantly share code, notes, and snippets.

@SWORDIntel
Last active May 21, 2025 21:03
Show Gist options
  • Save SWORDIntel/6c6b252c6f293b44fe3606563861c849 to your computer and use it in GitHub Desktop.
Save SWORDIntel/6c6b252c6f293b44fe3606563861c849 to your computer and use it in GitHub Desktop.
KEYPLUG EXTRACTOR
#!/usr/bin/env python3
"""
KEYPLUG-ANALYZER: Comprehensive Analysis Tool for KEYPLUG Extracted Payloads
Performs in-depth analysis of potentially malicious content extracted by KEYPLUG tool
Project: QUANTUM SHIELD
Author: John
Version: 1.1
"""
import os
import sys
import re
import json
import math
import binascii
import hashlib
import subprocess
import shutil
import struct
import tempfile
import argparse
import time
from datetime import datetime
from pathlib import Path
from collections import Counter, defaultdict
import concurrent.futures
# Optional imports with fallbacks
try:
import matplotlib.pyplot as plt
import numpy as np
PLOTTING_AVAILABLE = True
except ImportError:
PLOTTING_AVAILABLE = False
try:
import magic
MAGIC_AVAILABLE = True
except ImportError:
MAGIC_AVAILABLE = False
try:
import yara
YARA_AVAILABLE = True
except ImportError:
YARA_AVAILABLE = False
try:
from capstone import *
DISASM_AVAILABLE = True
except ImportError:
DISASM_AVAILABLE = False
# Configure basic logging
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("keyplug_analyzer.log")
]
)
logger = logging.getLogger("keyplug_analyzer")
# ANSI Colors for terminal output
ANSI_RED = "\033[91m"
ANSI_GREEN = "\033[92m"
ANSI_YELLOW = "\033[93m"
ANSI_BLUE = "\033[94m"
ANSI_MAGENTA = "\033[95m"
ANSI_CYAN = "\033[96m"
ANSI_RESET = "\033[0m"
# Configuration for analysis
APT41_CONFIG = {
"markers": [
b"KEYP", b"RC4", b"http", b"MZ", b"PE\x00\x00",
b"cmd.exe", b"powershell", b"rundll32", b"regsvr32"
],
"xor_keys": [
"9e", "d3", "a5", "0a61200d", "410d200d", "4100200d",
"41414141", "00000000", "ffffffff", "12345678", "87654321",
"deadbeef"
],
"url_pattern": rb'https?://[^\s"\'>]{4,255}',
"domain_pattern": rb'(?:[a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}',
"ip_pattern": rb'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b',
"cmd_pattern": rb'(?:cmd\.exe|powershell|bash|wget|curl|certutil|bitsadmin|rundll32)',
"api_pattern": rb'(?:CreateProcess|VirtualAlloc|WriteProcessMemory|ResumeThread|WSASocket|connect|InternetOpen|HttpSendRequest)',
"pe_header_pattern": rb'MZ[\x00-\xff]{60}PE\x00\x00',
"config_header_patterns": [
rb'KEYP[\x00-\xff]{4}',
rb'CONFIG[\x00-\xff]{4}',
rb'RC4[\x00-\xff]{4}'
],
"entropy_threshold": 7.0,
"window_size": 256,
"max_pe_size": 5 * 1024 * 1024, # 5 MB maximum PE size to extract
"interesting_sections": ["UPX", ".text", ".data", ".rdata", ".rsrc", ".reloc"],
# KEYPLUG directory structure patterns
"keyplug_dir_patterns": {
"payload_dirs": ["payload", "payloads", "odg_scan_output"],
"decrypted_dirs": ["decrypted"],
"extract_dirs": ["odg_contents"],
"payload_file_patterns": ["*_payload.bin", "*.bin", "*_forced_*.bin", "*_pattern_*.bin"]
}
}
class APT41YaraRules:
"""Class for managing APT-41 specific YARA rules."""
RULES_TEXT = """
rule APT41_KEYPLUG_Payload {
meta:
description = "Detects KEYPLUG payload based on known patterns"
author = "John"
reference = "Manual analysis of KEYPLUG payloads"
confidence = "high"
date = "2025-05-21"
strings:
$keyplug = "KEYP" nocase
$config_marker = { 4B 45 59 50 [1-4] 00 00 }
$rc4_marker = "RC4"
$obfuscation = { 66 83 ?? ?? 66 81 ?? ?? }
$api_hash = { B8 ?? ?? ?? ?? 31 ?? ?? ?? 66 ?? ?? ?? 50 }
$persistence = "Software\\Microsoft\\Windows\\CurrentVersion\\Run" nocase
condition:
any of them
}
rule APT41_Config_Block {
meta:
description = "Detects potential APT41 config blocks"
author = "John"
reference = "Manual analysis of KEYPLUG payloads"
confidence = "medium"
strings:
$cfg1 = { 4B 45 59 50 [0-4] 00 00 }
$cfg2 = { 43 4F 4E 46 [0-4] 00 00 }
$url_marker = "http" nocase
$ip_block = { 25 (64|30|31|32|33|34|35|36|37|38|39) 2E 25 (64|30|31|32|33|34|35|36|37|38|39) }
condition:
any of ($cfg*) and any of ($url_marker, $ip_block)
}
rule Suspicious_PE_In_Data {
meta:
description = "Detects embedded PE files within data blocks"
author = "John"
confidence = "medium"
strings:
$mz = "MZ"
$pe = "PE\x00\x00"
$section1 = ".text"
$section2 = ".data"
$section3 = ".rdata"
$section4 = ".rsrc"
$injection = { 68 ?? ?? ?? ?? FF 75 ?? FF 55 }
condition:
$mz at 0 and $pe and 2 of ($section*) or
$mz and $pe and $injection
}
rule Shellcode_Patterns {
meta:
description = "Detects shellcode patterns common in APT41 payloads"
author = "John"
confidence = "medium"
strings:
$api_resolve = { 31 C0 64 8B ?? ?? 8B ?? ?? 8B ?? ?? 8B ?? 18 8B ?? 20 }
$fs_access = { 64 A1 ?? ?? ?? ?? }
$stack_strings = { C7 45 ?? ?? ?? ?? ?? C7 45 ?? ?? ?? ?? ?? C7 45 }
$syscall = { B8 ?? ?? ?? ?? CD 80 }
$find_kernel32 = { 31 ?? 64A1 ?? ?? ?? ?? }
$jumps = { EB ?? FF 25 ?? ?? ?? ?? E9 }
condition:
2 of them
}
rule XOR_Encrypted_PE {
meta:
description = "Detects XOR encrypted PE files"
author = "John"
confidence = "medium"
strings:
// Patterns that might indicate XOR'd MZ header
$xor_mz_1 = { 1? 1? }
$xor_mz_2 = { 2? 2? }
$xor_mz_3 = { 3? 3? }
$xor_mz_4 = { 4? 4? }
$xor_mz_5 = { 5? 5? }
$xor_mz_6 = { 6? 6? }
// High entropy indicators
$random_data = { ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? }
condition:
filesize > 1KB and
((filesize < 5MB and any of ($xor_mz_*)) or
(filesize < 10MB and $random_data and uint32(0) != 0x905A4D))
}
"""
@classmethod
def create_ruleset(cls):
"""
Create YARA ruleset from embedded rules.
Returns:
yara.Rules: Compiled YARA rules object or None if YARA is not available
"""
if not YARA_AVAILABLE:
logger.warning("YARA module not available. Install with: pip install yara-python")
return None
try:
# Create temp file for our embedded rules
with tempfile.NamedTemporaryFile(mode='w', suffix='.yar', delete=False) as f:
f.write(cls.RULES_TEXT)
embedded_rules_file = f.name
# Compile the embedded rules
compiled_rules = yara.compile(filepath=embedded_rules_file)
# Clean up the temp file
try:
os.unlink(embedded_rules_file)
except:
pass
return compiled_rules
except Exception as e:
logger.error(f"Failed to create YARA ruleset: {e}")
return None
class CryptoAnalyzer:
"""Utility class for analyzing and potentially decrypting encrypted content."""
@staticmethod
def calculate_entropy(data, base=2):
"""
Calculate Shannon entropy of data.
Args:
data (bytes): Data to calculate entropy for
base (int): Logarithmic base (2 for bits, 10 for decimal)
Returns:
float: Shannon entropy value
"""
if not data:
return 0.0
# Count byte occurrences
counter = Counter(data)
# Calculate entropy
entropy = 0.0
for count in counter.values():
probability = count / len(data)
entropy -= probability * math.log(probability, base)
return entropy
@staticmethod
def entropy_scan(data, window_size=256, step=128):
"""
Scan data with sliding window to find entropy transitions.
Args:
data (bytes): Data to scan
window_size (int): Size of sliding window
step (int): Step size between windows
Returns:
list: List of (position, entropy) tuples
"""
results = []
if len(data) <= window_size:
return [(0, CryptoAnalyzer.calculate_entropy(data))]
for i in range(0, len(data) - window_size, step):
window_data = data[i:i+window_size]
entropy = CryptoAnalyzer.calculate_entropy(window_data)
results.append((i, entropy))
return results
@staticmethod
def perform_xor_decryption(data, key_hex):
"""
Perform XOR decryption with a hex key.
Args:
data (bytes): Data to decrypt
key_hex (str): Hexadecimal key (e.g. "0A1B2C3D")
Returns:
bytes: Decrypted data
"""
key_bytes = bytes.fromhex(key_hex)
result = bytearray(len(data))
for i in range(len(data)):
result[i] = data[i] ^ key_bytes[i % len(key_bytes)]
return bytes(result)
@staticmethod
def detect_possible_xor_keys(data, sample_size=256, threshold=0.7):
"""
Try to detect possible XOR keys that might have been used to encrypt the data.
Args:
data (bytes): Encrypted data
sample_size (int): Size of data sample to analyze
threshold (float): Threshold for key detection (0.0-1.0)
Returns:
list: List of possible XOR keys (hex strings)
"""
# Only analyze a sample of the data for performance
if len(data) <= sample_size:
sample = data
else:
# Take samples from beginning and middle of file
sample1 = data[:sample_size//2]
midpoint = len(data)//2
sample2 = data[midpoint:midpoint+sample_size//2]
sample = sample1 + sample2
# Common byte values we might expect in clean data
expected_bytes = [0x00, ord('\n'), ord('\r'), ord(' '), ord('.'), ord(',')]
# ASCII ranges
lower_ascii = range(ord('a'), ord('z')+1)
upper_ascii = range(ord('A'), ord('Z')+1)
digits = range(ord('0'), ord('9')+1)
# Scoring for keys
key_scores = defaultdict(int)
# Try single-byte keys first (most common in simple malware)
for key in range(1, 256):
key_bytes = bytes([key])
decrypted = bytes(b ^ key for b in sample)
# Calculate score based on resulting decrypted data
text_score = 0
control_chars = 0
null_bytes = 0
ascii_chars = 0
for b in decrypted:
if b in expected_bytes:
text_score += 1
elif b in range(1, 32): # Control characters
control_chars += 1
elif b == 0:
null_bytes += 1
elif b in lower_ascii or b in upper_ascii or b in digits:
ascii_chars += 1
# Normalized score
normalized_score = (text_score * 2 + ascii_chars) / len(sample)
penalty = (control_chars - null_bytes) / len(sample)
final_score = normalized_score - penalty
if final_score > threshold:
key_scores[key_bytes.hex()] = final_score
# Get top keys by score
sorted_keys = sorted(key_scores.items(), key=lambda x: x[1], reverse=True)
return [k for k, score in sorted_keys[:5]] # Return top 5
@staticmethod
def detect_embedded_pe(data, max_pes=5, min_pe_size=256):
"""
Detect embedded PE files within binary data.
Args:
data (bytes): Binary data to scan
max_pes (int): Maximum number of PE files to extract
min_pe_size (int): Minimum size of valid PE file
Returns:
list: List of tuples (offset, size, pe_data)
"""
results = []
# Find all "MZ" occurrences
mz_positions = [m.start() for m in re.finditer(b'MZ', data)]
for pos in mz_positions:
# Skip if too close to end of file to be a valid PE
if pos + 0x40 >= len(data):
continue
try:
# Check if a valid PE header follows the MZ header
pe_offset_pos = pos + 0x3C
if pe_offset_pos + 4 > len(data):
continue
pe_offset = struct.unpack("<I", data[pe_offset_pos:pe_offset_pos+4])[0]
pe_header_pos = pos + pe_offset
if pe_header_pos + 4 > len(data):
continue
pe_header = data[pe_header_pos:pe_header_pos+4]
if pe_header == b'PE\x00\x00':
# Found valid PE header
# Try to determine size of PE
section_count_pos = pe_header_pos + 6
if section_count_pos + 2 > len(data):
continue
section_count = struct.unpack("<H", data[section_count_pos:section_count_pos+2])[0]
pe_size = APT41_CONFIG["max_pe_size"] # Default max size
# Extract to separate file
pe_data = data[pos:pos+pe_size]
# Only add if PE is large enough to be valid
if len(pe_data) >= min_pe_size:
results.append((pos, len(pe_data), pe_data))
# Limit number of PEs to extract
if len(results) >= max_pes:
break
except:
# Skip invalid headers
continue
return results
class NetworkAnalyzer:
"""Utility class for extracting and analyzing network indicators."""
@staticmethod
def extract_network_indicators(data):
"""
Extract network indicators from binary data.
Args:
data (bytes): Binary data to scan
Returns:
dict: Dictionary of network indicators by type
"""
results = {
"urls": [],
"domains": [],
"ips": [],
"emails": []
}
# Extract URLs
for match in re.finditer(APT41_CONFIG["url_pattern"], data):
url = match.group(0).decode('latin1', errors='replace')
if url not in results["urls"]:
results["urls"].append(url)
# Extract domains
for match in re.finditer(APT41_CONFIG["domain_pattern"], data):
domain = match.group(0).decode('latin1', errors='replace')
if domain not in results["domains"] and not any(domain in url for url in results["urls"]):
results["domains"].append(domain)
# Extract IPs
for match in re.finditer(APT41_CONFIG["ip_pattern"], data):
ip = match.group(0).decode('latin1', errors='replace')
if ip not in results["ips"]:
results["ips"].append(ip)
# Look for potential email addresses
email_pattern = rb'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
for match in re.finditer(email_pattern, data):
email = match.group(0).decode('latin1', errors='replace')
if email not in results["emails"]:
results["emails"].append(email)
return results
@staticmethod
def analyze_domains(domains):
"""
Analyze extracted domains for patterns and signs of maliciousness.
Args:
domains (list): List of extracted domains
Returns:
dict: Analysis results
"""
results = {
"dga_likely": [],
"suspicious_tlds": [],
"high_entropy": []
}
for domain in domains:
# Check for DGA-like patterns (high entropy, unusual character distribution)
domain_name = domain.split('.')[0]
entropy = CryptoAnalyzer.calculate_entropy(domain_name.encode())
if entropy > 4.0: # High entropy often indicates DGA
results["high_entropy"].append(domain)
# Check for suspicious TLDs
suspicious_tlds = [".xyz", ".top", ".club", ".info", ".biz", ".cc", ".tk"]
if any(domain.endswith(tld) for tld in suspicious_tlds):
results["suspicious_tlds"].append(domain)
# Check for random-looking patterns
consonant_sequences = re.findall(r'[bcdfghjklmnpqrstvwxyz]{5,}', domain_name, re.IGNORECASE)
if consonant_sequences:
results["dga_likely"].append(domain)
return results
class PEAnalyzer:
"""Utility class for analyzing PE files."""
@staticmethod
def extract_pe_info(pe_data):
"""
Extract basic information from a PE file.
Args:
pe_data (bytes): PE file data
Returns:
dict: PE file information
"""
try:
if not pe_data or len(pe_data) < 64:
return {"error": "Invalid PE data"}
result = {
"headers": {},
"sections": [],
"imports": [],
"exports": [],
"resources": [],
"compilation_time": None,
"entropy": CryptoAnalyzer.calculate_entropy(pe_data),
"suspicious_indicators": []
}
# Try to extract PE header information
if pe_data[:2] != b'MZ':
return {"error": "Not a valid PE file (missing MZ signature)"}
# Get PE header offset
pe_offset = struct.unpack("<I", pe_data[0x3C:0x40])[0]
if pe_offset + 24 > len(pe_data) or pe_data[pe_offset:pe_offset+4] != b'PE\x00\x00':
return {"error": "Invalid PE header"}
# Get machine type
machine_type = struct.unpack("<H", pe_data[pe_offset+4:pe_offset+6])[0]
result["headers"]["machine_type"] = machine_type
# Get number of sections
num_sections = struct.unpack("<H", pe_data[pe_offset+6:pe_offset+8])[0]
result["headers"]["num_sections"] = num_sections
# Get timestamp
timestamp = struct.unpack("<I", pe_data[pe_offset+8:pe_offset+12])[0]
result["compilation_time"] = timestamp
# Extract basic headers info
optional_header_size = struct.unpack("<H", pe_data[pe_offset+20:pe_offset+22])[0]
result["headers"]["characteristics"] = struct.unpack("<H", pe_data[pe_offset+22:pe_offset+24])[0]
# Check for suspicious indicators
if result["entropy"] > 7.0:
result["suspicious_indicators"].append("High file entropy")
if num_sections > 8:
result["suspicious_indicators"].append(f"Unusually high section count ({num_sections})")
# Super basic section info
section_table_offset = pe_offset + 24 + optional_header_size
for i in range(num_sections):
section_offset = section_table_offset + (i * 40)
if section_offset + 40 > len(pe_data):
break
section_name = pe_data[section_offset:section_offset+8].rstrip(b'\x00').decode('ascii', errors='replace')
section_vsize = struct.unpack("<I", pe_data[section_offset+8:section_offset+12])[0]
section_vaddr = struct.unpack("<I", pe_data[section_offset+12:section_offset+16])[0]
section_rsize = struct.unpack("<I", pe_data[section_offset+16:section_offset+20])[0]
section_raddr = struct.unpack("<I", pe_data[section_offset+20:section_offset+24])[0]
result["sections"].append({
"name": section_name,
"virtual_size": section_vsize,
"virtual_addr": section_vaddr,
"raw_size": section_rsize,
"raw_addr": section_raddr
})
# Check for suspicious section names
if section_name not in [".text", ".data", ".rdata", ".rsrc", ".reloc", ".idata", ".edata", ".pdata"]:
result["suspicious_indicators"].append(f"Unusual section name: {section_name}")
# Check for execute+write sections
section_chars = struct.unpack("<I", pe_data[section_offset+36:section_offset+40])[0]
if section_chars & 0x20000000 and section_chars & 0x80000000:
result["suspicious_indicators"].append(f"Section {section_name} is both executable and writeable")
return result
except Exception as e:
return {"error": f"PE analysis failed: {str(e)}"}
@staticmethod
def extract_strings_from_pe(pe_data, min_length=6):
"""
Extract strings from PE file sections with section context.
Args:
pe_data (bytes): PE file data
min_length (int): Minimum string length
Returns:
dict: Strings by section
"""
result = {
"headers": [],
"sections": {}
}
try:
# Check for valid PE
if not pe_data or len(pe_data) < 64 or pe_data[:2] != b'MZ':
return {"error": "Invalid PE data"}
# Get PE header offset
pe_offset = struct.unpack("<I", pe_data[0x3C:0x40])[0]
if pe_offset + 24 > len(pe_data) or pe_data[pe_offset:pe_offset+4] != b'PE\x00\x00':
return {"error": "Invalid PE header"}
# Extract strings from PE headers
current_str = ""
for byte in pe_data[:pe_offset+24]:
if 32 <= byte <= 126: # ASCII printable range
current_str += chr(byte)
else:
if len(current_str) >= min_length:
result["headers"].append(current_str)
current_str = ""
# Get number of sections and section table info
num_sections = struct.unpack("<H", pe_data[pe_offset+6:pe_offset+8])[0]
optional_header_size = struct.unpack("<H", pe_data[pe_offset+20:pe_offset+22])[0]
section_table_offset = pe_offset + 24 + optional_header_size
# Extract strings from each section
for i in range(num_sections):
section_offset = section_table_offset + (i * 40)
if section_offset + 40 > len(pe_data):
break
section_name = pe_data[section_offset:section_offset+8].rstrip(b'\x00').decode('ascii', errors='replace')
section_rsize = struct.unpack("<I", pe_data[section_offset+16:section_offset+20])[0]
section_raddr = struct.unpack("<I", pe_data[section_offset+20:section_offset+24])[0]
# Skip sections with no raw data
if section_rsize == 0 or section_raddr + section_rsize > len(pe_data):
continue
# Extract strings from this section
section_data = pe_data[section_raddr:section_raddr+section_rsize]
section_strings = []
current_str = ""
for byte in section_data:
if 32 <= byte <= 126:
current_str += chr(byte)
else:
if len(current_str) >= min_length:
section_strings.append(current_str)
current_str = ""
if section_strings:
result["sections"][section_name] = section_strings
return result
except Exception as e:
return {"error": f"String extraction failed: {str(e)}"}
class PEAnalyzer:
"""Utility class for analyzing PE files."""
@staticmethod
def extract_pe_info(pe_data):
"""
Extract basic information from a PE file.
Args:
pe_data (bytes): PE file data
Returns:
dict: PE file information
"""
try:
if not pe_data or len(pe_data) < 64:
return {"error": "Invalid PE data"}
result = {
"headers": {},
"sections": [],
"imports": [],
"exports": [],
"resources": [],
"compilation_time": None,
"entropy": CryptoAnalyzer.calculate_entropy(pe_data),
"suspicious_indicators": []
}
# Try to extract PE header information
if pe_data[:2] != b'MZ':
return {"error": "Not a valid PE file (missing MZ signature)"}
# Get PE header offset
pe_offset = struct.unpack("<I", pe_data[0x3C:0x40])[0]
if pe_offset + 24 > len(pe_data) or pe_data[pe_offset:pe_offset+4] != b'PE\x00\x00':
return {"error": "Invalid PE header"}
# Get machine type
machine_type = struct.unpack("<H", pe_data[pe_offset+4:pe_offset+6])[0]
result["headers"]["machine_type"] = machine_type
# Get number of sections
num_sections = struct.unpack("<H", pe_data[pe_offset+6:pe_offset+8])[0]
result["headers"]["num_sections"] = num_sections
# Get timestamp
timestamp = struct.unpack("<I", pe_data[pe_offset+8:pe_offset+12])[0]
result["compilation_time"] = timestamp
# Extract basic headers info
optional_header_size = struct.unpack("<H", pe_data[pe_offset+20:pe_offset+22])[0]
result["headers"]["characteristics"] = struct.unpack("<H", pe_data[pe_offset+22:pe_offset+24])[0]
# Check for suspicious indicators
if result["entropy"] > 7.0:
result["suspicious_indicators"].append("High file entropy")
if num_sections > 8:
result["suspicious_indicators"].append(f"Unusually high section count ({num_sections})")
# Super basic section info
section_table_offset = pe_offset + 24 + optional_header_size
for i in range(num_sections):
section_offset = section_table_offset + (i * 40)
if section_offset + 40 > len(pe_data):
break
section_name = pe_data[section_offset:section_offset+8].rstrip(b'\x00').decode('ascii', errors='replace')
section_vsize = struct.unpack("<I", pe_data[section_offset+8:section_offset+12])[0]
section_vaddr = struct.unpack("<I", pe_data[section_offset+12:section_offset+16])[0]
section_rsize = struct.unpack("<I", pe_data[section_offset+16:section_offset+20])[0]
section_raddr = struct.unpack("<I", pe_data[section_offset+20:section_offset+24])[0]
result["sections"].append({
"name": section_name,
"virtual_size": section_vsize,
"virtual_addr": section_vaddr,
"raw_size": section_rsize,
"raw_addr": section_raddr
})
# Check for suspicious section names
if section_name not in [".text", ".data", ".rdata", ".rsrc", ".reloc", ".idata", ".edata", ".pdata"]:
result["suspicious_indicators"].append(f"Unusual section name: {section_name}")
# Check for execute+write sections
section_chars = struct.unpack("<I", pe_data[section_offset+36:section_offset+40])[0]
if section_chars & 0x20000000 and section_chars & 0x80000000:
result["suspicious_indicators"].append(f"Section {section_name} is both executable and writeable")
return result
except Exception as e:
return {"error": f"PE analysis failed: {str(e)}"}
@staticmethod
def extract_strings_from_pe(pe_data, min_length=6):
"""
Extract strings from PE file sections with section context.
Args:
pe_data (bytes): PE file data
min_length (int): Minimum string length
Returns:
dict: Strings by section
"""
result = {
"headers": [],
"sections": {}
}
try:
# Check for valid PE
if not pe_data or len(pe_data) < 64 or pe_data[:2] != b'MZ':
return {"error": "Invalid PE data"}
# Get PE header offset
pe_offset = struct.unpack("<I", pe_data[0x3C:0x40])[0]
if pe_offset + 24 > len(pe_data) or pe_data[pe_offset:pe_offset+4] != b'PE\x00\x00':
return {"error": "Invalid PE header"}
# Extract strings from PE headers
current_str = ""
for byte in pe_data[:pe_offset+24]:
if 32 <= byte <= 126: # ASCII printable range
current_str += chr(byte)
else:
if len(current_str) >= min_length:
result["headers"].append(current_str)
current_str = ""
# Get number of sections and section table info
num_sections = struct.unpack("<H", pe_data[pe_offset+6:pe_offset+8])[0]
optional_header_size = struct.unpack("<H", pe_data[pe_offset+20:pe_offset+22])[0]
section_table_offset = pe_offset + 24 + optional_header_size
# Extract strings from each section
for i in range(num_sections):
section_offset = section_table_offset + (i * 40)
if section_offset + 40 > len(pe_data):
break
section_name = pe_data[section_offset:section_offset+8].rstrip(b'\x00').decode('ascii', errors='replace')
section_rsize = struct.unpack("<I", pe_data[section_offset+16:section_offset+20])[0]
section_raddr = struct.unpack("<I", pe_data[section_offset+20:section_offset+24])[0]
# Skip sections with no raw data
if section_rsize == 0 or section_raddr + section_rsize > len(pe_data):
continue
# Extract strings from this section
section_data = pe_data[section_raddr:section_raddr+section_rsize]
section_strings = []
current_str = ""
for byte in section_data:
if 32 <= byte <= 126:
current_str += chr(byte)
else:
if len(current_str) >= min_length:
section_strings.append(current_str)
current_str = ""
if section_strings:
result["sections"][section_name] = section_strings
return result
except Exception as e:
return {"error": f"String extraction failed: {str(e)}"}
class DisassemblyAnalyzer:
"""Utility class for analyzing disassembled code."""
@staticmethod
def find_api_patterns(data, start_offset=0):
"""
Look for API call patterns in binary data.
Args:
data (bytes): Binary data
start_offset (int): Starting offset for analysis
Returns:
list: Found API call patterns
"""
if not DISASM_AVAILABLE:
return {"error": "Capstone disassembly engine not available"}
try:
results = []
# Initialize disassembler for x86/x64
md32 = Cs(CS_ARCH_X86, CS_MODE_32)
md64 = Cs(CS_ARCH_X86, CS_MODE_64)
# Check for both 32-bit and 64-bit code patterns
for mode, md in [("x86", md32), ("x64", md64)]:
# Disassemble a chunk of code
for chunk_start in range(start_offset, len(data) - 1024, 512):
chunk = data[chunk_start:chunk_start + 1024]
try:
last_insns = []
api_pattern = []
for i, (address, size, mnemonic, op_str) in enumerate(md.disasm_lite(chunk, chunk_start)):
insn = f"{mnemonic} {op_str}".strip()
# Keep track of last 5 instructions for context
last_insns.append((address, insn))
if len(last_insns) > 5:
last_insns.pop(0)
# Look for API calls
call_patterns = ["call", "jmp"]
if any(mnemonic.startswith(p) for p in call_patterns):
# Check for various API calling patterns
if "ptr" in op_str:
api_pattern = list(last_insns)
api_pattern.append((address, insn))
results.append({
"offset": chunk_start,
"arch": mode,
"pattern": api_pattern,
"context": last_insns
})
# Look for syscall/sysenter
if mnemonic in ["syscall", "sysenter"]:
results.append({
"offset": address,
"arch": mode,
"pattern": "system_call",
"context": last_insns
})
except Exception as e:
# Skip errors in disassembly
continue
return results
except Exception as e:
return {"error": f"Disassembly analysis failed: {str(e)}"}
class ConfigExtractor:
"""Utility class for extracting embedded configuration data."""
@staticmethod
def find_config_patterns(data):
"""
Search for potential APT41 config blocks.
Args:
data (bytes): Binary data to search
Returns:
list: Potential config blocks with metadata
"""
results = []
# Search for config header patterns
for pattern in APT41_CONFIG["config_header_patterns"]:
for match in re.finditer(pattern, data):
start_pos = match.start()
pattern_bytes = match.group(0)
# Determine a reasonable size for config (up to 1KB after header)
max_config_size = 1024
end_pos = min(start_pos + max_config_size, len(data))
config_data = data[start_pos:end_pos]
# Check for C2 URLs, IPs, or other indicators within this block
network_indicators = NetworkAnalyzer.extract_network_indicators(config_data)
# Only include if there are network indicators (likely a real config)
if any(network_indicators.values()):
results.append({
"offset": start_pos,
"size": len(config_data),
"header": pattern_bytes.hex(),
"data": config_data.hex(),
"indicators": network_indicators
})
return results
@staticmethod
def decode_potential_configs(data, default_xor_keys=None):
"""
Try to decode potential config blocks using common encryption schemes.
Args:
data (bytes): Binary data containing potential configs
default_xor_keys (list): List of hex XOR keys to try
Returns:
list: Decoded config data with metadata
"""
results = []
# Default XOR keys to try
if default_xor_keys is None:
default_xor_keys = APT41_CONFIG["xor_keys"]
# First identify potential config blocks
potential_configs = ConfigExtractor.find_config_patterns(data)
# For each potential config block
for config in potential_configs:
config_data = bytes.fromhex(config["data"])
decoded_configs = []
# Try decoding with XOR keys
for key_hex in default_xor_keys:
try:
decoded = CryptoAnalyzer.perform_xor_decryption(config_data, key_hex)
# Check if decoding produced meaningful data
network_indicators = NetworkAnalyzer.extract_network_indicators(decoded)
if any(len(indics) > 0 for indics in network_indicators.values()):
decoded_configs.append({
"key": key_hex,
"method": "xor",
"decoded": decoded.hex(),
"indicators": network_indicators
})
except:
continue
# Add decoding results to the config
if decoded_configs:
config["decoded"] = decoded_configs
results.append(config)
return results
class DisassemblyAnalyzer:
"""Utility class for analyzing disassembled code."""
@staticmethod
def find_api_patterns(data, start_offset=0):
"""
Look for API call patterns in binary data.
Args:
data (bytes): Binary data
start_offset (int): Starting offset for analysis
Returns:
list: Found API call patterns
"""
if not DISASM_AVAILABLE:
return {"error": "Capstone disassembly engine not available"}
try:
results = []
# Initialize disassembler for x86/x64
md32 = Cs(CS_ARCH_X86, CS_MODE_32)
md64 = Cs(CS_ARCH_X86, CS_MODE_64)
# Check for both 32-bit and 64-bit code patterns
for mode, md in [("x86", md32), ("x64", md64)]:
# Disassemble a chunk of code
for chunk_start in range(start_offset, len(data) - 1024, 512):
chunk = data[chunk_start:chunk_start + 1024]
try:
last_insns = []
api_pattern = []
for i, (address, size, mnemonic, op_str) in enumerate(md.disasm_lite(chunk, chunk_start)):
insn = f"{mnemonic} {op_str}".strip()
# Keep track of last 5 instructions for context
last_insns.append((address, insn))
if len(last_insns) > 5:
last_insns.pop(0)
# Look for API calls
call_patterns = ["call", "jmp"]
if any(mnemonic.startswith(p) for p in call_patterns):
# Check for various API calling patterns
if "ptr" in op_str:
api_pattern = list(last_insns)
api_pattern.append((address, insn))
results.append({
"offset": chunk_start,
"arch": mode,
"pattern": api_pattern,
"context": last_insns
})
# Look for syscall/sysenter
if mnemonic in ["syscall", "sysenter"]:
results.append({
"offset": address,
"arch": mode,
"pattern": "system_call",
"context": last_insns
})
except Exception as e:
# Skip errors in disassembly
continue
return results
except Exception as e:
return {"error": f"Disassembly analysis failed: {str(e)}"}
class ConfigExtractor:
"""Utility class for extracting embedded configuration data."""
@staticmethod
def find_config_patterns(data):
"""
Search for potential APT41 config blocks.
Args:
data (bytes): Binary data to search
Returns:
list: Potential config blocks with metadata
"""
results = []
# Search for config header patterns
for pattern in APT41_CONFIG["config_header_patterns"]:
for match in re.finditer(pattern, data):
start_pos = match.start()
pattern_bytes = match.group(0)
# Determine a reasonable size for config (up to 1KB after header)
max_config_size = 1024
end_pos = min(start_pos + max_config_size, len(data))
config_data = data[start_pos:end_pos]
# Check for C2 URLs, IPs, or other indicators within this block
network_indicators = NetworkAnalyzer.extract_network_indicators(config_data)
# Only include if there are network indicators (likely a real config)
if any(network_indicators.values()):
results.append({
"offset": start_pos,
"size": len(config_data),
"header": pattern_bytes.hex(),
"data": config_data.hex(),
"indicators": network_indicators
})
return results
@staticmethod
def decode_potential_configs(data, default_xor_keys=None):
"""
Try to decode potential config blocks using common encryption schemes.
Args:
data (bytes): Binary data containing potential configs
default_xor_keys (list): List of hex XOR keys to try
Returns:
list: Decoded config data with metadata
"""
results = []
# Default XOR keys to try
if default_xor_keys is None:
default_xor_keys = APT41_CONFIG["xor_keys"]
# First identify potential config blocks
potential_configs = ConfigExtractor.find_config_patterns(data)
# For each potential config block
for config in potential_configs:
config_data = bytes.fromhex(config["data"])
decoded_configs = []
# Try decoding with XOR keys
for key_hex in default_xor_keys:
try:
decoded = CryptoAnalyzer.perform_xor_decryption(config_data, key_hex)
# Check if decoding produced meaningful data
network_indicators = NetworkAnalyzer.extract_network_indicators(decoded)
if any(len(indics) > 0 for indics in network_indicators.values()):
decoded_configs.append({
"key": key_hex,
"method": "xor",
"decoded": decoded.hex(),
"indicators": network_indicators
})
except:
continue
# Add decoding results to the config
if decoded_configs:
config["decoded"] = decoded_configs
results.append(config)
return results
class PayloadAnalyzer:
"""Main class for analyzing KEYPLUG payloads."""
def __init__(self, output_dir="keyplug_analysis", report_dir="keyplug_reports", yara_rules=None):
"""
Initialize the payload analyzer.
Args:
output_dir (str): Directory to save analysis artifacts
report_dir (str): Directory to save reports
yara_rules (yara.Rules): YARA rules for scanning
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.report_dir = Path(report_dir)
self.report_dir.mkdir(parents=True, exist_ok=True)
# Load YARA rules if available
self.yara_rules = yara_rules or (APT41YaraRules.create_ruleset() if YARA_AVAILABLE else None)
# Initialize processed files cache
self.processed_files_cache = set()
self._load_processed_files_cache()
def _load_processed_files_cache(self):
"""Load MD5 hashes of already processed files from cache file."""
cache_file = self.report_dir / "processed_files.txt"
if cache_file.exists():
with open(cache_file, 'r') as f:
for line in f:
self.processed_files_cache.add(line.strip())
def _save_processed_files_cache(self):
"""Save MD5 hashes of processed files to cache file."""
cache_file = self.report_dir / "processed_files.txt"
with open(cache_file, 'w') as f:
for file_hash in self.processed_files_cache:
f.write(f"{file_hash}\n")
def find_keyplug_payloads(self, base_dir):
"""
Find all KEYPLUG extracted payloads from a base directory.
Args:
base_dir (str): Base directory to search in
Returns:
list: List of potential payload files
"""
base_dir = Path(base_dir)
payload_files = []
if not base_dir.exists() or not base_dir.is_dir():
logger.warning(f"Base directory not found: {base_dir}")
return []
# First search for possible KEYPLUG output directories
potential_payload_dirs = []
# Check current directory
for pattern in APT41_CONFIG["keyplug_dir_patterns"]["payload_dirs"]:
if (base_dir / pattern).exists() and (base_dir / pattern).is_dir():
potential_payload_dirs.append(base_dir / pattern)
# Check subdirectories (in case it's the parent folder of multiple scans)
for subdir in base_dir.iterdir():
if not subdir.is_dir():
continue
# Check if this subdir or any of its subdirs might be a KEYPLUG output dir
for pattern in APT41_CONFIG["keyplug_dir_patterns"]["payload_dirs"]:
if (subdir / pattern).exists() and (subdir / pattern).is_dir():
potential_payload_dirs.append(subdir / pattern)
# Also check for the pattern in the directory name itself
for pattern in APT41_CONFIG["keyplug_dir_patterns"]["payload_dirs"]:
if pattern.lower() in subdir.name.lower():
potential_payload_dirs.append(subdir)
# Now look in all potential dirs for files matching payload patterns
for payload_dir in potential_payload_dirs:
for pattern in APT41_CONFIG["keyplug_dir_patterns"]["payload_file_patterns"]:
payload_files.extend(payload_dir.glob(pattern))
# Look in decrypted directories too
for payload_dir in potential_payload_dirs:
parent_dir = payload_dir.parent
for decrypted_pattern in APT41_CONFIG["keyplug_dir_patterns"]["decrypted_dirs"]:
decrypted_dir = parent_dir / decrypted_pattern
if decrypted_dir.exists() and decrypted_dir.is_dir():
for file_pattern in APT41_CONFIG["keyplug_dir_patterns"]["payload_file_patterns"]:
payload_files.extend(decrypted_dir.glob(file_pattern))
# Remove duplicates and sort
unique_payloads = list(set(payload_files))
unique_payloads.sort()
logger.info(f"Found {len(unique_payloads)} potential KEYPLUG payload files")
return unique_payloads
def analyze_file(self, file_path, force=False):
"""
Perform comprehensive analysis on a file.
Args:
file_path (str): Path to the file to analyze
force (bool): Force analysis even if file was previously analyzed
Returns:
dict: Analysis results
"""
file_path = Path(file_path)
# Check if file exists
if not file_path.exists():
return {"error": f"File not found: {file_path}"}
# Calculate file MD5 hash
try:
with open(file_path, 'rb') as f:
file_md5 = hashlib.md5(f.read()).hexdigest()
except Exception as e:
return {"error": f"Failed to read file: {str(e)}"}
# Check if already processed (unless force=True)
if not force and file_md5 in self.processed_files_cache:
logger.info(f"Skipping already analyzed file: {file_path} (MD5: {file_md5})")
return {"status": "skipped", "reason": "already_analyzed", "md5": file_md5, "file": str(file_path)}
logger.info(f"Analyzing file: {file_path}")
# Init results
results = {
"timestamp": datetime.now().isoformat(),
"file": {
"name": file_path.name,
"path": str(file_path),
"size": file_path.stat().size,
},
"basic_analysis": {},
"network_indicators": {},
"detected_pe": [],
"yara_matches": [],
"entropy_analysis": {},
"decryptions": [],
"config": {},
"executive_summary": [],
"conclusion": {}
}
try:
# Calculate file hashes
with open(file_path, 'rb') as f:
data = f.read()
# Hash calculation
results["file"]["md5"] = hashlib.md5(data).hexdigest()
results["file"]["sha1"] = hashlib.sha1(data).hexdigest()
results["file"]["sha256"] = hashlib.sha256(data).hexdigest()
# Get file type if magic is available
if MAGIC_AVAILABLE:
mime = magic.Magic(mime=True)
results["file"]["mime_type"] = mime.from_file(str(file_path))
mime_desc = magic.Magic()
results["file"]["file_type"] = mime_desc.from_file(str(file_path))
else:
# Fallback: Simple file type detection
if data.startswith(b'MZ'):
results["file"]["file_type"] = "PE executable"
elif data.startswith(b'\x7FELF'):
results["file"]["file_type"] = "ELF executable"
elif data.startswith(b'PK\x03\x04'):
results["file"]["file_type"] = "Zip archive"
else:
results["file"]["file_type"] = "Unknown binary data"
# Basic analysis
results["basic_analysis"]["entropy"] = CryptoAnalyzer.calculate_entropy(data)
results["basic_analysis"]["is_encrypted"] = results["basic_analysis"]["entropy"] > APT41_CONFIG["entropy_threshold"]
# Entropy scan for interesting sections
entropy_segments = CryptoAnalyzer.entropy_scan(data, APT41_CONFIG["window_size"])
results["entropy_analysis"]["segments"] = [(offset, round(entropy, 2)) for offset, entropy in entropy_segments]
# Find high entropy segments (potential encrypted/compressed data)
high_entropy_segments = [(offset, entropy) for offset, entropy in entropy_segments if entropy > APT41_CONFIG["entropy_threshold"]]
results["entropy_analysis"]["high_entropy_segments"] = [(offset, round(entropy, 2)) for offset, entropy in high_entropy_segments]
# Find entropy transitions (potential encrypted/plaintext boundaries)
transitions = []
if len(entropy_segments) > 1:
for i in range(1, len(entropy_segments)):
prev_entropy = entropy_segments[i-1][1]
curr_entropy = entropy_segments[i][1]
entropy_delta = abs(curr_entropy - prev_entropy)
if entropy_delta > 1.5: # Significant entropy change
transitions.append({
"offset": entropy_segments[i][0],
"from_entropy": round(prev_entropy, 2),
"to_entropy": round(curr_entropy, 2),
"delta": round(entropy_delta, 2)
})
results["entropy_analysis"]["transitions"] = transitions
# Network indicators analysis
results["network_indicators"] = NetworkAnalyzer.extract_network_indicators(data)
if results["network_indicators"]["domains"]:
results["network_indicators"]["domain_analysis"] = NetworkAnalyzer.analyze_domains(
results["network_indicators"]["domains"]
)
# Try to identify PE files embedded in the data
embedded_pes = CryptoAnalyzer.detect_embedded_pe(data)
# Save and analyze embedded PEs
for i, (offset, size, pe_data) in enumerate(embedded_pes):
# Hash the PE for identification
pe_hash = hashlib.md5(pe_data).hexdigest()
pe_filename = f"embedded_pe_{i+1}_{pe_hash[:8]}.bin"
pe_path = self.output_dir / pe_filename
# Save the PE file
with open(pe_path, 'wb') as f:
f.write(pe_data)
# Analyze the PE
pe_info = PEAnalyzer.extract_pe_info(pe_data)
pe_strings = PEAnalyzer.extract_strings_from_pe(pe_data)
# Add to results
results["detected_pe"].append({
"offset": offset,
"size": size,
"md5": pe_hash,
"file": str(pe_path),
"info": pe_info,
"strings": pe_strings
})
# YARA scanning (if available)
if self.yara_rules:
try:
matches = self.yara_rules.match(data=data)
for match in matches:
match_data = {
"rule": match.rule,
"meta": match.meta,
"tags": match.tags,
"strings": []
}
# Include matched strings with context
if hasattr(match, 'strings'):
for string_id, offset, string_bytes in match.strings:
# Get a bit of context around each match
context_start = max(0, offset - 16)
context_end = min(len(data), offset + len(string_bytes) + 16)
context_bytes = data[context_start:context_end]
match_data["strings"].append({
"id": string_id,
"offset": offset,
"bytes": string_bytes.hex(),
"bytes_ascii": string_bytes.decode('latin1', errors='replace'),
"context": context_bytes.hex(),
"context_ascii": context_bytes.decode('latin1', errors='replace')
})
results["yara_matches"].append(match_data)
except Exception as e:
results["yara_matches"] = [{"error": f"YARA scanning error: {str(e)}"}]
# Try XOR decryption with different keys
logger.info("Attempting XOR decryption with different keys")
# First try to automatically detect possible XOR keys
detected_keys = CryptoAnalyzer.detect_possible_xor_keys(data)
results["detected_xor_keys"] = detected_keys
# Try both detected keys and default keys from APT41_CONFIG
all_keys = detected_keys + APT41_CONFIG["xor_keys"]
# Try decryption with different keys
successful_decryptions = []
for key_hex in all_keys:
try:
# Only try each key once
if key_hex in [d["key"] for d in successful_decryptions]:
continue
decrypted = CryptoAnalyzer.perform_xor_decryption(data, key_hex)
# Check if decryption produced meaningful data
decryption_score = 0
decryption_reasons = []
# Look for PE headers in decrypted data
if decrypted.startswith(b'MZ'):
decryption_score += 10
decryption_reasons.append("Found decrypted PE header")
# Look for common text patterns
text_patterns = [b"http:", b"www.", b"<?xml", b"</html>", b"#include", b"Command", b"windows", b"kernel32"]
for pattern in text_patterns:
if pattern in decrypted:
decryption_score += 5
decryption_reasons.append(f"Found text pattern: {pattern.decode('ascii', errors='replace')}")
# Look for PE headers anywhere in the decrypted data
if b'MZ' in decrypted and b'PE\x00\x00' in decrypted:
pe_index = decrypted.find(b'MZ')
decryption_score += 10
decryption_reasons.append(f"Found embedded PE at offset {pe_index}")
# Check entropy - should become either lower (plaintext) or higher (compressed)
decrypted_entropy = CryptoAnalyzer.calculate_entropy(decrypted)
entropy_delta = abs(decrypted_entropy - results["basic_analysis"]["entropy"])
if entropy_delta > 0.5:
decryption_score += 3
decryption_reasons.append(f"Significant entropy change: {entropy_delta:.2f}")
# Network indicators in decrypted data
network_indicators = NetworkAnalyzer.extract_network_indicators(decrypted)
if any(indicators for indicators in network_indicators.values()):
decryption_score += 10
indicator_count = sum(len(indicators) for indicators in network_indicators.values())
decryption_reasons.append(f"Found {indicator_count} network indicators")
# Save promising decryptions
if decryption_score > 5:
# Generate a unique filename
decrypted_hash = hashlib.md5(decrypted).hexdigest()[:8]
decrypted_filename = f"decrypted_{file_path.stem}_{key_hex}_{decrypted_hash}.bin"
decrypted_path = self.output_dir / decrypted_filename
# Save the decrypted file
with open(decrypted_path, 'wb') as f:
f.write(decrypted)
successful_decryptions.append({
"key": key_hex,
"entropy": round(decrypted_entropy, 2),
"score": decryption_score,
"reasons": decryption_reasons,
"file": str(decrypted_path),
"network_indicators": network_indicators
})
except Exception as e:
logger.debug(f"Error during decryption with key {key_hex}: {e}")
continue
# Sort decryptions by score
results["decryptions"] = sorted(successful_decryptions, key=lambda x: x["score"], reverse=True)
# Config extraction
results["config"]["patterns"] = ConfigExtractor.find_config_patterns(data)
# Try to decode configs in both original and decrypted files
results["config"]["decoded"] = ConfigExtractor.decode_potential_configs(data)
# If we have successful decryptions, look for configs there too
for decryption in results["decryptions"]:
try:
with open(decryption["file"], 'rb') as f:
decrypted_data = f.read()
config_results = ConfigExtractor.find_config_patterns(decrypted_data)
if config_results:
decryption["configs"] = config_results
except:
continue
# Generate an executive summary from the findings
results["executive_summary"] = PayloadAnalyzer._generate_executive_summary(results)
# Generate a conclusion
results["conclusion"] = PayloadAnalyzer._generate_conclusion(results)
# Add to processed files cache
self.processed_files_cache.add(file_md5)
self._save_processed_files_cache()
# Save the full analysis results
report_file = self.report_dir / f"analysis_{results['file']['md5']}.json"
with open(report_file, 'w') as f:
json.dump(results, f, indent=2)
# Generate human-readable report
md_report = self._generate_markdown_report(results)
md_report_file = self.report_dir / f"analysis_{results['file']['md5']}.md"
with open(md_report_file, 'w') as f:
f.write(md_report)
logger.info(f"Analysis complete - Reports saved to {report_file} and {md_report_file}")
return results
except Exception as e:
logger.error(f"Error during analysis: {e}")
return {"error": f"Analysis failed: {str(e)}"}
@staticmethod
def _generate_executive_summary(results):
"""
Generate an executive summary of the analysis.
Args:
results (dict): Analysis results
Returns:
list: Key findings as bullet points
"""
summary = []
# Check for high entropy (encryption)
if results["basic_analysis"]["entropy"] > 7.5:
summary.append(f"File exhibits very high entropy ({results['basic_analysis']['entropy']:.2f}), indicating encryption or compression")
# Check for YARA rule matches
if results["yara_matches"]:
for match in results["yara_matches"]:
try:
rule = match["rule"]
confidence = match["meta"].get("confidence", "unknown")
summary.append(f"Matched YARA rule: {rule} (confidence: {confidence})")
except:
continue
# Check for embedded PE files
if results["detected_pe"]:
summary.append(f"Detected {len(results['detected_pe'])} embedded PE files")
for pe in results["detected_pe"]:
try:
if "suspicious_indicators" in pe["info"] and pe["info"]["suspicious_indicators"]:
indicators = '; '.join(pe["info"]["suspicious_indicators"][:3])
summary.append(f"Suspicious indicators in embedded PE: {indicators}")
except:
continue
# Check for network indicators
if any(indicators for indicators in results["network_indicators"].values()):
c2_count = len(results["network_indicators"]["urls"]) + len(results["network_indicators"]["domains"]) + len(results["network_indicators"]["ips"])
if c2_count > 0:
summary.append(f"Identified {c2_count} potential C2 indicators")
# List top 3 domains/URLs
all_c2 = results["network_indicators"]["urls"] + results["network_indicators"]["domains"] + results["network_indicators"]["ips"]
if all_c2:
c2_list = ', '.join(all_c2[:3])
summary.append(f"Notable C2 indicators: {c2_list}")
# Check for successful decryption
if results["decryptions"]:
best_decryption = results["decryptions"][0]
summary.append(f"Successfully decrypted payload using key: {best_decryption['key']}")
# Check for config blocks
if results["config"]["patterns"]:
summary.append(f"Identified {len(results['config']['patterns'])} potential configuration blocks")
# Add default finding if nothing else is found
if not summary:
summary.append("No significant malware indicators detected")
return summary
@staticmethod
def _generate_conclusion(results):
"""
Generate a conclusion about the analyzed file.
Args:
results (dict): Analysis results
Returns:
dict: Conclusion data including threat assessment and attribution
"""
conclusion = {
"threat_level": "unknown",
"confidence": 0.0,
"apt41_probability": 0.0,
"classification": [],
"attribution": [],
"recommendations": []
}
# Indicators for threat level assessment
indicators = {
"yara_matches": bool(results["yara_matches"]),
"embedded_pe": bool(results["detected_pe"]),
"c2_indicators": any(indicators for indicators in results["network_indicators"].values()),
"config_blocks": bool(results["config"]["patterns"] or results["config"]["decoded"]),
"high_entropy": results["basic_analysis"].get("entropy", 0) > 7.5,
"successful_decryption": bool(results["decryptions"])
}
# Count positive indicators
positive_count = sum(1 for indicator in indicators.values() if indicator)
# Set threat level
if positive_count >= 4:
conclusion["threat_level"] = "high"
conclusion["confidence"] = 0.9
elif positive_count >= 2:
conclusion["threat_level"] = "medium"
conclusion["confidence"] = 0.7
elif positive_count >= 1:
conclusion["threat_level"] = "low"
conclusion["confidence"] = 0.5
else:
conclusion["threat_level"] = "minimal"
conclusion["confidence"] = 0.3
# Classification of malware
if indicators["embedded_pe"]:
conclusion["classification"].append("dropper")
if indicators["config_blocks"]:
conclusion["classification"].append("backdoor")
if indicators["c2_indicators"]:
conclusion["classification"].append("remote_access")
if indicators["high_entropy"] and indicators["successful_decryption"]:
conclusion["classification"].append("encrypted")
# If no classification, add generic
if not conclusion["classification"]:
conclusion["classification"].append("generic")
# Check for APT41 attribution
apt41_indicators = 0
# Check YARA matches for APT41
for match in results["yara_matches"]:
try:
rule_name = match["rule"]
if "APT41" in rule_name:
apt41_indicators += 2
except:
continue
# Check for KEYP markers
if any("KEYP" in str(pattern) for pattern in results["config"]["patterns"]):
apt41_indicators += 1
# Check for RC4 encryption references
if results["config"]["decoded"] and any("RC4" in str(decoded) for decoded in results["config"]["decoded"]):
apt41_indicators += 1
# Calculate APT41 probability
if apt41_indicators >= 3:
conclusion["apt41_probability"] = 0.9
conclusion["attribution"].append("APT41")
elif apt41_indicators >= 1:
conclusion["apt41_probability"] = 0.6
conclusion["attribution"].append("possible APT41")
# Add recommendations based on findings
if conclusion["threat_level"] in ["medium", "high"]:
conclusion["recommendations"].append("Submit sample to professional malware analysis service")
conclusion["recommendations"].append("Check system for indicators of compromise")
conclusion["recommendations"].append("Investigate source of the suspected malware")
if indicators["c2_indicators"]:
conclusion["recommendations"].append("Block identified C2 domains and IPs")
conclusion["recommendations"].append("Monitor for suspicious network traffic to similar domains")
if conclusion["apt41_probability"] > 0.5:
conclusion["recommendations"].append("Employ APT41-specific hunting techniques across the enterprise")
return conclusion
def batch_analyze(self, directory, pattern="*.bin", force=False):
"""
Analyze multiple files matching a pattern in a directory.
Args:
directory (str): Directory to scan
pattern (str): Glob pattern for files to analyze
force (bool): Force analysis even for previously analyzed files
Returns:
list: List of analysis results
"""
directory = Path(directory)
files = list(directory.glob(pattern))
if not files:
logger.warning(f"No files matching '{pattern}' found in {directory}")
return []
logger.info(f"Found {len(files)} files to analyze in {directory}")
results = []
# Analyze each file
with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, os.cpu_count() or 4)) as executor:
future_to_file = {executor.submit(self.analyze_file, file, force): file for file in files}
for future in concurrent.futures.as_completed(future_to_file):
file = future_to_file[future]
try:
result = future.result()
results.append(result)
logger.info(f"Completed analysis of {file}")
except Exception as e:
logger.error(f"Error analyzing {file}: {e}")
results.append({"file": str(file), "error": str(e)})
# Generate summary report
self._generate_batch_summary(results)
return results
def _generate_batch_summary(self, results):
"""
Generate a summary of batch analysis.
Args:
results (list): List of analysis results
Returns:
None
"""
summary_file = self.report_dir / "batch_summary.md"
# Filter out skipped files from the summary count
filtered_results = [r for r in results if not ("status" in r and r["status"] == "skipped")]
with open(summary_file, 'w') as f:
f.write("# KEYPLUG Batch Analysis Summary\n\n")
f.write(f"**Analysis Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"**Files Analyzed:** {len(filtered_results)}\n\n")
# Count files by threat level
threat_levels = {"high": 0, "medium": 0, "low": 0, "minimal": 0, "unknown": 0, "error": 0}
for result in filtered_results:
if "error" in result:
threat_levels["error"] += 1
else:
level = result.get("conclusion", {}).get("threat_level", "unknown")
threat_levels[level] += 1
# Write threat level summary
f.write("## Threat Level Summary\n\n")
f.write("| Threat Level | Count |\n")
f.write("|--------------|-------|\n")
for level, count in threat_levels.items():
f.write(f"| {level.capitalize()} | {count} |\n")
# Write file details table
f.write("\n## Analyzed Files\n\n")
f.write("| Filename | MD5 | Threat | Classification | Notable Findings |\n")
f.write("|----------|-----|--------|----------------|------------------|\n")
for result in filtered_results:
if "error" in result:
filename = result.get("file", "Unknown")
f.write(f"| {filename} | Error | Error | Error | {result['error']} |\n")
elif "status" in result and result["status"] == "skipped":
continue # Skip files that were skipped
else:
filename = result.get("file", {}).get("name", "Unknown")
md5 = result.get("file", {}).get("md5", "Unknown")
threat = result.get("conclusion", {}).get("threat_level", "unknown").capitalize()
classification = ', '.join(result.get("conclusion", {}).get("classification", ["Unknown"]))
# Get top finding
findings = result.get("executive_summary", ["No findings"])
top_finding = findings[0] if findings else "No findings"
f.write(f"| {filename} | {md5} | {threat} | {classification} | {top_finding} |\n")
# Add any APT41 attribution
apt41_files = []
for result in filtered_results:
if "error" not in result and result.get("conclusion", {}).get("apt41_probability", 0) > 0.5:
apt41_files.append({
"filename": result.get("file", {}).get("name", "Unknown"),
"md5": result.get("file", {}).get("md5", "Unknown"),
"probability": result.get("conclusion", {}).get("apt41_probability", 0) * 100
})
if apt41_files:
f.write("\n## Potential APT41 Attribution\n\n")
f.write("| Filename | MD5 | APT41 Probability |\n")
f.write("|----------|-----|-------------------|\n")
for file in apt41_files:
f.write(f"| {file['filename']} | {file['md5']} | {file['probability']:.0f}% |\n")
# Add conclusion
f.write("\n## Conclusion\n\n")
f.write(f"Analyzed {len(filtered_results)} files with the following distribution of threat levels:\n\n")
for level, count in threat_levels.items():
if count > 0:
f.write(f"- **{level.capitalize()}:** {count} files\n")
if apt41_files:
f.write(f"\nFound {len(apt41_files)} files with potential APT41 attribution.\n")
logger.info(f"Batch summary saved to {summary_file}")
def monitor_and_analyze(self, base_dir, interval=None, force=False):
"""
Monitor a directory for KEYPLUG payloads and analyze them.
Args:
base_dir (str): Base directory to monitor
interval (int): If provided, continuously monitor every interval seconds
force (bool): Force analysis even for previously analyzed files
Returns:
dict: Summary of analysis results
"""
base_dir = Path(base_dir)
# Function to perform one scan
def perform_scan():
# Find all potential payload files
payload_files = self.find_keyplug_payloads(base_dir)
# Filter out already processed files (unless force=True)
if not force:
unprocessed_files = []
for file_path in payload_files:
try:
with open(file_path, 'rb') as f:
file_md5 = hashlib.md5(f.read()).hexdigest()
if file_md5 not in self.processed_files_cache:
unprocessed_files.append(file_path)
except Exception as e:
logger.warning(f"Error hashing file {file_path}: {e}")
# Include it anyway to be safe
unprocessed_files.append(file_path)
logger.info(f"Found {len(unprocessed_files)} new files out of {len(payload_files)} total")
payload_files = unprocessed_files
if not payload_files:
logger.info("No new payloads to analyze")
return {"analyzed": 0, "skipped": 0, "errors": 0, "files": []}
# Analyze each file
results = []
analyzed = 0
skipped = 0
errors = 0
for file_path in payload_files:
try:
result = self.analyze_file(file_path, force)
results.append(result)
if "status" in result and result["status"] == "skipped":
skipped += 1
logger.info(f"Skipped {file_path}")
elif "error" in result:
errors += 1
logger.error(f"Error analyzing {file_path}: {result['error']}")
else:
analyzed += 1
logger.info(f"Analyzed {file_path}")
except Exception as e:
errors += 1
logger.error(f"Error processing {file_path}: {e}")
# Generate batch summary if any files were analyzed
if analyzed > 0:
self._generate_batch_summary(results)
return {
"analyzed": analyzed,
"skipped": skipped,
"errors": errors,
"files": [str(f) for f in payload_files]
}
# Perform first scan
scan_results = perform_scan()
# If interval is provided, continue monitoring
if interval:
try:
logger.info(f"Monitoring {base_dir} for new payloads every {interval} seconds. Press Ctrl+C to stop.")
while True:
time.sleep(interval)
logger.info(f"Performing scheduled scan of {base_dir}")
scan_results = perform_scan()
except KeyboardInterrupt:
logger.info("Monitoring stopped by user")
return scan_results
def get_api_results(self):
"""
Return analysis results in a format consumable by other tools.
Returns:
dict: API-friendly results
"""
return {
"version": "3.0",
"timestamp": datetime.now().isoformat(),
"source_file": str(self.odg_path),
"extracted_files": [str(Path(r["payload"]).resolve()) for r in self.results],
"decrypted_files": [str(Path(d["path"]).resolve())
for r in self.results
for d in r["analysis"]["decryption_results"]],
"risk_summary": {
"high_risk": sum(1 for r in self.results if self._is_high_risk(r)),
"low_risk": sum(1 for r in self.results if not self._is_high_risk(r))
},
"full_results": self.results
}
def _write_indicators(self, file_handle, analysis):
"""
Write suspicious indicators to the report.
Args:
file_handle (file): Open file handle to write to
analysis (dict): Analysis results to extract indicators from
"""
has_indicators = False
if analysis['strings']['urls']:
file_handle.write(f"### ⚠️ Suspicious URLs\n")
for url in analysis['strings']['urls']:
file_handle.write(f"- `{url}`\n")
file_handle.write("\n")
has_indicators = True
if analysis['strings']['domains']:
file_handle.write(f"### ⚠️ Domain References\n")
for domain in analysis['strings']['domains']:
file_handle.write(f"- `{domain}`\n")
file_handle.write("\n")
has_indicators = True
if analysis['strings']['ips']:
file_handle.write(f"### ⚠️ IP Addresses\n")
for ip in analysis['strings']['ips']:
file_handle.write(f"- `{ip}`\n")
file_handle.write("\n")
has_indicators = True
if analysis['strings']['commands']:
file_handle.write(f"### ⚠️ Command References\n")
for cmd in analysis['strings']['commands']:
file_handle.write(f"- `{cmd}`\n")
file_handle.write("\n")
has_indicators = True
if analysis['strings']['api_calls']:
file_handle.write(f"### ⚠️ Suspicious API Calls\n")
for api in analysis['strings']['api_calls']:
file_handle.write(f"- `{api}`\n")
file_handle.write("\n")
has_indicators = True
if not has_indicators:
file_handle.write("### No obvious suspicious indicators found\n\n")
def _is_high_risk(self, result):
"""
Determine if a payload should be considered high risk.
Args:
result (dict): Result information for a payload
Returns:
bool: True if high risk, False otherwise
"""
analysis = result['analysis']
# Check for obvious indicators
if analysis['strings']['urls'] or analysis['strings']['commands'] or analysis['strings']['api_calls']:
return True
# Check YARA matches
if analysis['yara_matches']:
return True
# Check executable types
if any(ext in result['analysis']['file_type'].lower() for ext in
['executable', 'pe32', 'elf', 'script']):
return True
# High entropy could indicate obfuscation
if analysis['encryption_assessment']['is_encrypted'] and analysis['entropy'] > 7.8:
return True
# Check for successful decryption attempts
if analysis['decryption_results'] and any('PE' in d['file_type'] for d in analysis['decryption_results']):
return True
# Check for embedded PE files
if analysis['embedded_files']:
return True
return False
def cleanup_temp_files(self):
"""Clean up temporary extraction files."""
if self.cleanup:
logger.info("Cleaning up temporary files...")
try:
shutil.rmtree(self.extract_dir)
logger.info("Temporary files removed successfully")
except Exception as e:
logger.error(f"Error cleaning up temporary files: {e}")
def run_analysis(self):
"""
Run the complete analysis workflow.
Returns:
bool: True if analysis completes successfully
"""
logger.info(f"Starting analysis of {self.odg_path}")
# Extract the ODG file
if not self.extract_odg():
logger.error("ODG extraction failed. Aborting analysis.")
return False
# Find all JPEG files in the extracted content
self.jpeg_files = self.find_jpeg_files()
if not self.jpeg_files:
logger.warning("No JPEG files found in the ODG file.")
# Process each JPEG file
for jpeg in tqdm(self.jpeg_files, desc="Analyzing JPEGs"):
try:
# Try to extract payload
payload_path, eof_pos, detection_method = self.extract_payload(jpeg)
# If standard extraction failed, try forced extraction for problematic files
if not payload_path or not Path(payload_path).exists():
logger.warning(f"Standard extraction failed for {jpeg.name}, trying forced methods")
payload_path, detection_method = self.force_extract_payload_heuristic(jpeg)
# Skip if no payload was found with either method
if not payload_path or not Path(payload_path).exists():
logger.warning(f"All extraction methods failed for {jpeg.name}")
continue
# Calculate file sizes and hashes
jpeg_hash = self.calc_file_hashes(jpeg)
payload_hash = self.calc_file_hashes(payload_path)
payload_size = Path(payload_path).stat().st_size
# Analyze the payload
analysis = self.analyze_payload(payload_path, detection_method)
# Calculate relative path within the ODG
try:
relative_path = jpeg.relative_to(self.extract_dir)
except ValueError:
relative_path = jpeg.name
# Store the results
self.results.append({
"jpeg": jpeg.name,
"jpeg_relative_path": str(relative_path),
"jpeg_hash": jpeg_hash,
"payload": str(payload_path),
"payload_hash": payload_hash,
"payload_size": payload_size,
"analysis": analysis,
"detection_method": detection_method
})
logger.info(f"Successfully analyzed payload from {jpeg.name}")
except Exception as e:
logger.error(f"Error processing JPEG {jpeg}: {e}")
# Generate the report
self.generate_report()
# Clean up temporary files if requested
self.cleanup_temp_files()
return True
def main():
"""Main entry point for the script."""
# Set up command line argument parsing
parser = argparse.ArgumentParser(
description="KEYPLUG-ANALYZER: Comprehensive Analysis Tool for KEYPLUG Payloads",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
Extract payloads from a single file:
python keyplug_analyzer.py analyze suspicious_file.bin
Specify custom output directory:
python keyplug_analyzer.py analyze suspicious_file.bin -o /path/to/output
Enable deep scan with YARA rules:
python keyplug_analyzer.py analyze suspicious_file.bin --deep-scan
Attempt to decrypt all files in a directory:
python keyplug_analyzer.py batch /path/to/payload/directory
Monitor a directory for new payloads:
python keyplug_analyzer.py monitor /path/to/keyplug/output --interval 300
Full analysis with all options:
python keyplug_analyzer.py auto /path/to/keyplug/directory --deep-scan --force
"""
)
subparsers = parser.add_subparsers(dest="command", help="Command to execute")
# File analysis parser
file_parser = subparsers.add_parser("analyze", help="Analyze a single file")
file_parser.add_argument("file", help="Path to file for analysis")
file_parser.add_argument("-o", "--output", help="Output directory for analysis artifacts")
file_parser.add_argument("-r", "--report", help="Directory to save reports")
file_parser.add_argument("-f", "--force", action="store_true",
help="Force reanalysis even if previously analyzed")
file_parser.add_argument("--deep-scan", action="store_true",
help="Perform deep analysis with YARA rules")
file_parser.add_argument("--brute-force", action="store_true",
help="Attempt to decrypt encrypted payloads")
# Batch analysis parser
batch_parser = subparsers.add_parser("batch", help="Analyze multiple files")
batch_parser.add_argument("directory", help="Directory containing files to analyze")
batch_parser.add_argument("-p", "--pattern", default="*.bin",
help="Glob pattern for files (default: *.bin)")
batch_parser.add_argument("-o", "--output", help="Output directory for analysis artifacts")
batch_parser.add_argument("-r", "--report", help="Directory to save reports")
batch_parser.add_argument("-f", "--force", action="store_true",
help="Force reanalysis even if previously analyzed")
batch_parser.add_argument("--deep-scan", action="store_true",
help="Perform deep analysis with YARA rules")
batch_parser.add_argument("--brute-force", action="store_true",
help="Attempt to decrypt encrypted payloads")
# Monitoring parser
monitor_parser = subparsers.add_parser("monitor", help="Monitor directory for new payloads")
monitor_parser.add_argument("directory", help="Directory to monitor")
monitor_parser.add_argument("-i", "--interval", type=int, default=300,
help="Scan interval in seconds (default: 300)")
monitor_parser.add_argument("-o", "--output", help="Output directory for analysis artifacts")
monitor_parser.add_argument("-r", "--report", help="Directory to save reports")
monitor_parser.add_argument("-f", "--force", action="store_true",
help="Force reanalysis of existing files")
monitor_parser.add_argument("--deep-scan", action="store_true",
help="Perform deep analysis with YARA rules")
# Auto-detect parser
auto_parser = subparsers.add_parser("auto", help="Auto-detect and analyze KEYPLUG payloads")
auto_parser.add_argument("directory", help="Base directory to scan")
auto_parser.add_argument("-o", "--output", help="Output directory for analysis artifacts")
auto_parser.add_argument("-r", "--report", help="Directory to save reports")
auto_parser.add_argument("-f", "--force", action="store_true",
help="Force reanalysis of existing files")
auto_parser.add_argument("--deep-scan", action="store_true",
help="Perform deep analysis with YARA rules")
# Entropy graph parser
entropy_parser = subparsers.add_parser("entropy", help="Generate entropy graph for a file")
entropy_parser.add_argument("file", help="Path to file for entropy analysis")
entropy_parser.add_argument("-o", "--output", help="Output directory for graphs")
# Parse arguments
args = parser.parse_args()
# Default output directories
default_output_dir = "keyplug_analysis"
default_report_dir = "keyplug_reports"
# Configure logging based on verbosity
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('keyplug_analyzer.log')
]
)
try:
# Execute appropriate command
if args.command == "analyze":
# Single file analysis
analyzer = PayloadAnalyzer(
output_dir=args.output or default_output_dir,
report_dir=args.report or default_report_dir
)
result = analyzer.analyze_file(
args.file,
force=args.force
)
if "error" in result:
logger.error(f"Analysis failed: {result['error']}")
else:
logger.info(f"Analysis complete. Reports saved in {analyzer.report_dir}")
elif args.command == "batch":
# Batch analysis
analyzer = PayloadAnalyzer(
output_dir=args.output or default_output_dir,
report_dir=args.report or default_report_dir
)
results = analyzer.batch_analyze(
args.directory,
pattern=args.pattern,
force=args.force
)
logger.info(f"Batch analysis complete. Analyzed {len(results)} files.")
elif args.command == "monitor":
# Continuous monitoring
analyzer = PayloadAnalyzer(
output_dir=args.output or default_output_dir,
report_dir=args.report or default_report_dir
)
results = analyzer.monitor_and_analyze(
args.directory,
interval=args.interval,
force=args.force
)
logger.info("Monitoring completed or interrupted.")
elif args.command == "auto":
# Auto-detect and analyze
watcher = KeyplugWatcher(
args.directory,
output_dir=args.output or default_output_dir,
report_dir=args.report or default_report_dir
)
results = watcher.auto_detect_and_analyze(force=args.force)
logger.info(f"Auto-analysis complete. Processed {results.get('payloads_found', 0)} payloads.")
elif args.command == "entropy":
# Generate entropy graph
graph_path = create_entropy_graph(args.file, args.output)
if graph_path:
logger.info(f"Entropy graph saved to {graph_path}")
else:
logger.error("Failed to generate entropy graph")
else:
parser.print_help()
except Exception as e:
logger.error(f"Unhandled exception: {e}", exc_info=True)
# Entry point
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment