Created
November 10, 2024 14:24
-
-
Save maple3142/d8acd5760ff9ec7040622930e12f0eec to your computer and use it in GitHub Desktop.
HITCON CTF 2024 Final - koh-ed25519 Author's Exploit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import json | |
import os | |
import subprocess | |
from base64 import b64decode | |
from pathlib import Path | |
from cryptography.exceptions import InvalidSignature | |
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey | |
import sys, subprocess | |
IMAGE_NAME = os.environ.get("SIGNER_IMAGE_NAME", "checker-env") | |
DOCKERFILE_PATH = Path(__file__).parent / "env.Dockerfile" | |
def check_image_exist(): | |
return len(subprocess.check_output(["docker", "images", "-q", IMAGE_NAME])) > 0 | |
def ensure_image_exist(): | |
if not check_image_exist(): | |
print("Image not found, building image...") | |
subprocess.run( | |
["docker", "build", ".", "-t", IMAGE_NAME, "-f", DOCKERFILE_PATH], | |
check=True, | |
) | |
class SigningError(Exception): | |
pass | |
SECCOMP = """ | |
from seccomp import * | |
f = SyscallFilter(defaction=KILL) | |
f.add_rule(ALLOW, 'access') | |
f.add_rule(ALLOW, 'arch_prctl') | |
f.add_rule(ALLOW, 'brk') | |
f.add_rule(ALLOW, 'close') | |
f.add_rule(ALLOW, 'dup2') | |
f.add_rule(ALLOW, 'epoll_create1') | |
f.add_rule(ALLOW, 'faccessat2') | |
f.add_rule(ALLOW, 'fcntl') | |
f.add_rule(ALLOW, 'fstat') | |
f.add_rule(ALLOW, 'futex') | |
f.add_rule(ALLOW, 'getcwd') | |
f.add_rule(ALLOW, 'getdents64') | |
f.add_rule(ALLOW, 'getegid') | |
f.add_rule(ALLOW, 'geteuid') | |
f.add_rule(ALLOW, 'getgid') | |
f.add_rule(ALLOW, 'getpgrp') | |
f.add_rule(ALLOW, 'getpid') | |
f.add_rule(ALLOW, 'getppid') | |
f.add_rule(ALLOW, 'getrandom') | |
f.add_rule(ALLOW, 'gettid') | |
f.add_rule(ALLOW, 'getuid') | |
f.add_rule(ALLOW, 'ioctl') | |
f.add_rule(ALLOW, 'lseek') | |
f.add_rule(ALLOW, 'mmap') | |
f.add_rule(ALLOW, 'mprotect') | |
f.add_rule(ALLOW, 'munmap') | |
f.add_rule(ALLOW, 'newfstatat') | |
f.add_rule(ALLOW, 'open') | |
f.add_rule(ALLOW, 'openat') | |
f.add_rule(ALLOW, 'pipe2') | |
f.add_rule(ALLOW, 'prctl') | |
f.add_rule(ALLOW, 'pread64') | |
f.add_rule(ALLOW, 'prlimit64') | |
f.add_rule(ALLOW, 'read') | |
f.add_rule(ALLOW, 'readlink') | |
f.add_rule(ALLOW, 'rseq') | |
f.add_rule(ALLOW, 'rt_sigaction') | |
f.add_rule(ALLOW, 'rt_sigprocmask') | |
f.add_rule(ALLOW, 'rt_sigreturn') | |
f.add_rule(ALLOW, 'set_robust_list') | |
f.add_rule(ALLOW, 'set_tid_address') | |
f.add_rule(ALLOW, 'uname') | |
f.add_rule(ALLOW, 'wait4') | |
f.add_rule(ALLOW, 'write') | |
f.add_rule(ALLOW, 'kill') | |
f.add_rule(ALLOW, "exit") | |
f.add_rule(ALLOW, "exit_group") | |
f.load() | |
""" | |
def run_script(program_path, script, timeout): | |
container_name = f"checker-{os.urandom(8).hex()}" | |
try: | |
p = subprocess.Popen( | |
[ | |
"docker", | |
"run", | |
"--rm", | |
"--name", | |
container_name, | |
"--hostname", | |
"sandbox", | |
"-i", | |
"-v", | |
f"{program_path}:/opt/sign.py:ro", | |
"-u", | |
"nobody", | |
"--read-only", | |
"--network", | |
"none", | |
"-e", | |
"PYTHONDONTWRITEBYTECODE=1", | |
"--entrypoint", | |
"/usr/bin/python3", | |
IMAGE_NAME, | |
], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=sys.stderr, | |
) | |
p.stdin.write(script.encode()) | |
p.stdin.close() | |
return p | |
except subprocess.TimeoutExpired: | |
raise SigningError("Signing program timeout") | |
finally: | |
subprocess.run( | |
["docker", "kill", container_name], | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL, | |
) | |
def extract_private_key_1(program_path: str | Path, timeout) -> list[bytes]: | |
if not check_image_exist(): | |
raise SigningError("Image not found") | |
script = ( | |
SECCOMP | |
+ f""" | |
import importlib.util | |
from base64 import b64encode | |
import sys, json | |
def import_from_file(name, path): | |
spec = importlib.util.spec_from_file_location(name, path) | |
module = importlib.util.module_from_spec(spec) | |
sys.modules[name] = module | |
spec.loader.exec_module(module) | |
return module | |
from cryptography.hazmat.backends.openssl.backend import backend as bk | |
bk.ed25519_load_private_bytes = lambda key: type('',(),{{'sign':lambda s,d:[sys.stdout.write('MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER'), print(b64encode(key).decode(), end=''), exit()]}})() | |
sign = import_from_file("sign", "/opt/sign.py").sign | |
[sign(b'a' * 32) for _ in [1]] | |
""" | |
) | |
o = ( | |
run_script(program_path, script, timeout) | |
.stdout.read() | |
.split(b"MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER") | |
) | |
if len(o) == 2: | |
return o[1] | |
def extract_mul_1(program_path: str | Path, pk, timeout) -> list[bytes]: | |
script = ( | |
SECCOMP | |
+ f""" | |
import importlib.util | |
from base64 import b64encode, b64decode | |
import sys, json, hashlib | |
def import_from_file(name, path): | |
spec = importlib.util.spec_from_file_location(name, path) | |
module = importlib.util.module_from_spec(spec) | |
sys.modules[name] = module | |
spec.loader.exec_module(module) | |
return module | |
b = 256 | |
q = 2**255 - 19 | |
l = 2**252 + 27742317777372353535851937790883648493 | |
def bit(h, i): | |
return (h[i // 8] >> (i % 8)) & 1 | |
def decodeint(s): | |
return sum(2**i * bit(s, i) for i in range(0, b)) | |
import hashlib | |
orig = hashlib.sha512 | |
def hook(x): | |
if len(x) == 64: | |
return orig(b'') | |
return orig(x) | |
hashlib.sha512 = hook | |
def decsig(x): | |
R = x[:32] | |
S = decodeint(x[32:]) | |
return R, S | |
pk = b64decode({pk}) | |
def Hm(R, A, M): | |
return int.from_bytes(orig(R + A + M).digest(), 'little') | |
sign = import_from_file("sign", "/opt/sign.py").sign | |
sys.stdout.write('MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER') | |
m1 = b'a' * 32 | |
m2 = b'b' * 32 | |
s1 = sign(m1) | |
s2 = sign(m2) | |
R1, S1 = decsig(s1) | |
R2, S2 = decsig(s2) | |
h1 = Hm(R1, pk, m1) | |
h2 = Hm(R2, pk, m2) | |
if R1 == R2: | |
s = (S1 - S2) * pow(h1 - h2, -1, l) % l | |
print(s.to_bytes(32, 'little').hex(), end='') | |
""" | |
) | |
return ( | |
run_script(program_path, script, timeout) | |
.stdout.read() | |
.split(b"MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER")[-1] | |
) | |
# generated from `../cli/client.py patch round 71 -t 7 --program` | |
file = Path.cwd() / sys.argv[1] | |
content = file.read_bytes() | |
pk = content.split(b"\n")[0][14:] | |
print("pk", pk) | |
proof_bin = Path(__file__).parent.parent / "proofs/bin/prove" | |
tmp = (Path(__file__).parent / "./tmptmptmp.py").absolute() | |
# remove the first line because some scripts with check for stack trace (with line number) | |
tmp.write_bytes(content[14 + len(pk) + 1 :]) | |
print("sk", sk := extract_private_key_1(tmp, 2)) | |
if sk: | |
subprocess.run([proof_bin, "-pk", pk, "-sk", sk]) | |
print("mul", mul := extract_mul_1(tmp, pk, 2)) | |
if mul: | |
subprocess.run([proof_bin, "-pk", pk, "-mul", mul]) | |
tmp.unlink() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import json | |
import os | |
import subprocess | |
from base64 import b64decode, b64encode | |
from pathlib import Path | |
from cryptography.exceptions import InvalidSignature | |
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey | |
import sys, subprocess, time, hashlib | |
IMAGE_NAME = os.environ.get("SIGNER_IMAGE_NAME", "checker-env") | |
DOCKERFILE_PATH = Path(__file__).parent / "env.Dockerfile" | |
def check_image_exist(): | |
return len(subprocess.check_output(["docker", "images", "-q", IMAGE_NAME])) > 0 | |
def ensure_image_exist(): | |
if not check_image_exist(): | |
print("Image not found, building image...") | |
subprocess.run( | |
["docker", "build", ".", "-t", IMAGE_NAME, "-f", DOCKERFILE_PATH], | |
check=True, | |
) | |
class SigningError(Exception): | |
pass | |
SECCOMP = """ | |
from seccomp import * | |
f = SyscallFilter(defaction=KILL) | |
f.add_rule(ALLOW, 'access') | |
f.add_rule(ALLOW, 'arch_prctl') | |
f.add_rule(ALLOW, 'brk') | |
f.add_rule(ALLOW, 'close') | |
f.add_rule(ALLOW, 'dup2') | |
f.add_rule(ALLOW, 'epoll_create1') | |
f.add_rule(ALLOW, 'faccessat2') | |
f.add_rule(ALLOW, 'fcntl') | |
f.add_rule(ALLOW, 'fstat') | |
f.add_rule(ALLOW, 'futex') | |
f.add_rule(ALLOW, 'getcwd') | |
f.add_rule(ALLOW, 'getdents64') | |
f.add_rule(ALLOW, 'getegid') | |
f.add_rule(ALLOW, 'geteuid') | |
f.add_rule(ALLOW, 'getgid') | |
f.add_rule(ALLOW, 'getpgrp') | |
f.add_rule(ALLOW, 'getpid') | |
f.add_rule(ALLOW, 'getppid') | |
f.add_rule(ALLOW, 'getrandom') | |
f.add_rule(ALLOW, 'gettid') | |
f.add_rule(ALLOW, 'getuid') | |
f.add_rule(ALLOW, 'ioctl') | |
f.add_rule(ALLOW, 'lseek') | |
f.add_rule(ALLOW, 'mmap') | |
f.add_rule(ALLOW, 'mprotect') | |
f.add_rule(ALLOW, 'munmap') | |
f.add_rule(ALLOW, 'newfstatat') | |
f.add_rule(ALLOW, 'open') | |
f.add_rule(ALLOW, 'openat') | |
f.add_rule(ALLOW, 'pipe2') | |
f.add_rule(ALLOW, 'prctl') | |
f.add_rule(ALLOW, 'pread64') | |
f.add_rule(ALLOW, 'prlimit64') | |
f.add_rule(ALLOW, 'read') | |
f.add_rule(ALLOW, 'readlink') | |
f.add_rule(ALLOW, 'rseq') | |
f.add_rule(ALLOW, 'rt_sigaction') | |
f.add_rule(ALLOW, 'rt_sigprocmask') | |
f.add_rule(ALLOW, 'rt_sigreturn') | |
f.add_rule(ALLOW, 'set_robust_list') | |
f.add_rule(ALLOW, 'set_tid_address') | |
f.add_rule(ALLOW, 'uname') | |
f.add_rule(ALLOW, 'wait4') | |
f.add_rule(ALLOW, 'write') | |
f.add_rule(ALLOW, 'kill') | |
f.add_rule(ALLOW, "exit") | |
f.add_rule(ALLOW, "exit_group") | |
f.load() | |
""" | |
SECCOMP = " " * len(SECCOMP) # nop it | |
def run_script(program_path, script, timeout, container_name=None): | |
if container_name is None: | |
container_name = f"checker-{os.urandom(8).hex()}" | |
try: | |
p = subprocess.Popen( | |
[ | |
"docker", | |
"run", | |
"--rm", | |
"--name", | |
container_name, | |
"--hostname", | |
"sandbox", | |
"-i", | |
"-v", | |
f"{program_path}:/opt/sign.py:ro", | |
"-u", | |
"nobody", | |
"--read-only", | |
"--network", | |
"none", | |
"-e", | |
"PYTHONDONTWRITEBYTECODE=1", | |
"--entrypoint", | |
"/usr/bin/python3", | |
IMAGE_NAME, | |
], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=sys.stderr, | |
) | |
p.stdin.write(script.encode()) | |
p.stdin.close() | |
return p | |
except subprocess.TimeoutExpired: | |
raise SigningError("Signing program timeout") | |
finally: | |
subprocess.run( | |
["docker", "kill", container_name], | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL, | |
) | |
def do_sign(program_path, data, timeout): | |
script = ( | |
SECCOMP | |
+ f""" | |
import importlib.util | |
from base64 import b64encode, b64decode | |
import sys, json, hashlib | |
def import_from_file(name, path): | |
spec = importlib.util.spec_from_file_location(name, path) | |
module = importlib.util.module_from_spec(spec) | |
sys.modules[name] = module | |
spec.loader.exec_module(module) | |
return module | |
inputs = [{data}] | |
sys.stdout.write('X') | |
sys.stdout.flush() | |
sign = import_from_file("sign", "/opt/sign.py").sign | |
out = [b64encode(sign(x)).decode() for x in inputs] | |
sys.stdout.write('MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER') | |
sys.stdout.write(out[0]) | |
""" | |
) | |
return b64decode( | |
run_script(program_path, script, timeout) | |
.stdout.read() | |
.split(b"MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER", 1)[1] | |
) | |
def do_sign_v2(program_path, data, timeout): | |
script = ( | |
SECCOMP | |
+ f""" | |
import importlib.util | |
from base64 import b64encode, b64decode | |
import sys, json, os, time | |
def import_from_file(name, path): | |
spec = importlib.util.spec_from_file_location(name, path) | |
module = importlib.util.module_from_spec(spec) | |
sys.modules[name] = module | |
spec.loader.exec_module(module) | |
return module | |
inputs = [b64decode({b64encode(data)})] | |
0 | |
0 | |
sign = import_from_file("sign", "/opt/sign.py").sign; sys.stdout.write('X'*128+'Y');sys.stdout.flush(); time.sleep(1) | |
out = [b64encode(sign(x)).decode() for x in inputs] | |
sys.stdout.write('MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER') | |
sys.stdout.write(out[0]) | |
""" | |
) | |
container_name = f"checker-{os.urandom(8).hex()}" | |
p = run_script(program_path, script, timeout, container_name) | |
while p.stdout.read(16) != b"X" * 16: | |
pass | |
while p.stdout.read(1) != b"Y": | |
pass | |
pid = int( | |
subprocess.check_output( | |
["docker", "inspect", "-f", "{{ .State.Pid }}", container_name] | |
).strip() | |
) | |
os.system(f"sudo -E gdb -q -x gggdb.py -p {pid}") | |
return b64decode( | |
p.stdout.read().split( | |
b"MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER", 1 | |
)[1] | |
) | |
b = 256 | |
q = 2**255 - 19 | |
l = 2**252 + 27742317777372353535851937790883648493 | |
def bit(h, i): | |
return (h[i // 8] >> (i % 8)) & 1 | |
def decodeint(s): | |
return sum(2**i * bit(s, i) for i in range(0, b)) | |
def decsig(x): | |
R = x[:32] | |
S = decodeint(x[32:]) | |
return R, S | |
def Hm(R, A, M): | |
return int.from_bytes(hashlib.sha512(R + A + M).digest(), "little") | |
def exp_fault(program_path: str | Path, pk, timeout) -> list[bytes]: | |
os.system("sudo echo ok") | |
m = b"x" * 32 | |
s1 = do_sign(program_path, m, timeout) | |
s2 = do_sign_v2(program_path, m, timeout) | |
print(s1.hex(), len(s1)) | |
print(s2.hex(), len(s2)) | |
R1, S1 = decsig(s1) | |
R2, S2 = decsig(s2) | |
h1 = Hm(R1, b64decode(pk), m) | |
h2 = Hm(R2, b"x" * len(b64decode(pk)), m) | |
if R1 == R2 and S1 != S2: | |
s = (S1 - S2) * pow(h1 - h2, -1, l) % l | |
return s.to_bytes(32, "little").hex() | |
# generated from `../cli/client.py patch round 120 -t 1 --program` | |
file = Path.cwd() / sys.argv[1] | |
content = file.read_bytes() | |
pk = content.split(b"\n")[0][14:] | |
print("pk", pk) | |
proof_bin = Path(__file__).parent.parent / "proofs/bin/prove" | |
pk_tmp = (Path(__file__).parent / "./pk_tmp").absolute() | |
pk_tmp.write_bytes(pk) | |
tmp = (Path(__file__).parent / "./tmptmptmp.py").absolute() | |
# remove the first line because some scripts with check for stack trace (with line number) | |
tmp.write_bytes(content[14 + len(pk) + 1 :]) | |
print("mul", mul := exp_fault(tmp, pk, 2)) | |
if mul: | |
subprocess.run([proof_bin, "-pk", pk, "-mul", mul]) | |
tmp.unlink() | |
pk_tmp.unlink() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if "gef" not in globals(): | |
print("[-] gef not loaded") | |
raise ValueError("gef not loaded") | |
from base64 import b64decode | |
gdb.execute("gef config context.enable false") | |
def search_binpattern_by_address( | |
binpattern: bytes, start_address: int, end_address: int | |
): | |
"""Search a binary pattern within a range defined by arguments.""" | |
step = 0x400 * gef.session.pagesize | |
locations = [] | |
for chunk_addr in range(start_address, end_address, step): | |
if chunk_addr + step > end_address: | |
chunk_size = end_address - chunk_addr | |
else: | |
chunk_size = step | |
try: | |
mem = gef.memory.read(chunk_addr, chunk_size) | |
except gdb.MemoryError: | |
return [] | |
i = 0 | |
while (i := mem.find(binpattern, i)) != -1: | |
start = chunk_addr + i | |
end = start + len(binpattern) | |
locations.append((start, end, None)) | |
i += 1 | |
del mem | |
return locations | |
def search_mem(msg): | |
for section in gef.memory.maps: | |
if not section.permission & (Permission.READ | Permission.WRITE): | |
continue | |
if section.path == "[vvar]": | |
continue | |
start = section.page_start | |
end = section.page_end - 1 | |
for st, ed, _ in search_binpattern_by_address(msg, start, end): | |
yield st | |
inferior = gdb.selected_inferior() | |
pk = b64decode(open("./pk_tmp").read().strip()) | |
while inferior.threads(): | |
changed = False | |
for addr in search_mem(pk): | |
print(hex(addr)) | |
inferior.write_memory(addr, b"x" * len(pk)) | |
changed = True | |
if changed: | |
gdb.execute("c") | |
break | |
gdb.execute("ni 100") # need to tune this... (use 10000 for python based) | |
# idk what's the best way to periodically scan the memory for the public key | |
gdb.execute("q") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment