Skip to content

Instantly share code, notes, and snippets.

@maple3142
Created November 10, 2024 14:24
Show Gist options
  • Save maple3142/d8acd5760ff9ec7040622930e12f0eec to your computer and use it in GitHub Desktop.
Save maple3142/d8acd5760ff9ec7040622930e12f0eec to your computer and use it in GitHub Desktop.
HITCON CTF 2024 Final - koh-ed25519 Author's Exploit
import argparse
import json
import os
import subprocess
from base64 import b64decode
from pathlib import Path
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
import sys, subprocess
IMAGE_NAME = os.environ.get("SIGNER_IMAGE_NAME", "checker-env")
DOCKERFILE_PATH = Path(__file__).parent / "env.Dockerfile"
def check_image_exist():
return len(subprocess.check_output(["docker", "images", "-q", IMAGE_NAME])) > 0
def ensure_image_exist():
if not check_image_exist():
print("Image not found, building image...")
subprocess.run(
["docker", "build", ".", "-t", IMAGE_NAME, "-f", DOCKERFILE_PATH],
check=True,
)
class SigningError(Exception):
pass
SECCOMP = """
from seccomp import *
f = SyscallFilter(defaction=KILL)
f.add_rule(ALLOW, 'access')
f.add_rule(ALLOW, 'arch_prctl')
f.add_rule(ALLOW, 'brk')
f.add_rule(ALLOW, 'close')
f.add_rule(ALLOW, 'dup2')
f.add_rule(ALLOW, 'epoll_create1')
f.add_rule(ALLOW, 'faccessat2')
f.add_rule(ALLOW, 'fcntl')
f.add_rule(ALLOW, 'fstat')
f.add_rule(ALLOW, 'futex')
f.add_rule(ALLOW, 'getcwd')
f.add_rule(ALLOW, 'getdents64')
f.add_rule(ALLOW, 'getegid')
f.add_rule(ALLOW, 'geteuid')
f.add_rule(ALLOW, 'getgid')
f.add_rule(ALLOW, 'getpgrp')
f.add_rule(ALLOW, 'getpid')
f.add_rule(ALLOW, 'getppid')
f.add_rule(ALLOW, 'getrandom')
f.add_rule(ALLOW, 'gettid')
f.add_rule(ALLOW, 'getuid')
f.add_rule(ALLOW, 'ioctl')
f.add_rule(ALLOW, 'lseek')
f.add_rule(ALLOW, 'mmap')
f.add_rule(ALLOW, 'mprotect')
f.add_rule(ALLOW, 'munmap')
f.add_rule(ALLOW, 'newfstatat')
f.add_rule(ALLOW, 'open')
f.add_rule(ALLOW, 'openat')
f.add_rule(ALLOW, 'pipe2')
f.add_rule(ALLOW, 'prctl')
f.add_rule(ALLOW, 'pread64')
f.add_rule(ALLOW, 'prlimit64')
f.add_rule(ALLOW, 'read')
f.add_rule(ALLOW, 'readlink')
f.add_rule(ALLOW, 'rseq')
f.add_rule(ALLOW, 'rt_sigaction')
f.add_rule(ALLOW, 'rt_sigprocmask')
f.add_rule(ALLOW, 'rt_sigreturn')
f.add_rule(ALLOW, 'set_robust_list')
f.add_rule(ALLOW, 'set_tid_address')
f.add_rule(ALLOW, 'uname')
f.add_rule(ALLOW, 'wait4')
f.add_rule(ALLOW, 'write')
f.add_rule(ALLOW, 'kill')
f.add_rule(ALLOW, "exit")
f.add_rule(ALLOW, "exit_group")
f.load()
"""
def run_script(program_path, script, timeout):
container_name = f"checker-{os.urandom(8).hex()}"
try:
p = subprocess.Popen(
[
"docker",
"run",
"--rm",
"--name",
container_name,
"--hostname",
"sandbox",
"-i",
"-v",
f"{program_path}:/opt/sign.py:ro",
"-u",
"nobody",
"--read-only",
"--network",
"none",
"-e",
"PYTHONDONTWRITEBYTECODE=1",
"--entrypoint",
"/usr/bin/python3",
IMAGE_NAME,
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=sys.stderr,
)
p.stdin.write(script.encode())
p.stdin.close()
return p
except subprocess.TimeoutExpired:
raise SigningError("Signing program timeout")
finally:
subprocess.run(
["docker", "kill", container_name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def extract_private_key_1(program_path: str | Path, timeout) -> list[bytes]:
if not check_image_exist():
raise SigningError("Image not found")
script = (
SECCOMP
+ f"""
import importlib.util
from base64 import b64encode
import sys, json
def import_from_file(name, path):
spec = importlib.util.spec_from_file_location(name, path)
module = importlib.util.module_from_spec(spec)
sys.modules[name] = module
spec.loader.exec_module(module)
return module
from cryptography.hazmat.backends.openssl.backend import backend as bk
bk.ed25519_load_private_bytes = lambda key: type('',(),{{'sign':lambda s,d:[sys.stdout.write('MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER'), print(b64encode(key).decode(), end=''), exit()]}})()
sign = import_from_file("sign", "/opt/sign.py").sign
[sign(b'a' * 32) for _ in [1]]
"""
)
o = (
run_script(program_path, script, timeout)
.stdout.read()
.split(b"MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER")
)
if len(o) == 2:
return o[1]
def extract_mul_1(program_path: str | Path, pk, timeout) -> list[bytes]:
script = (
SECCOMP
+ f"""
import importlib.util
from base64 import b64encode, b64decode
import sys, json, hashlib
def import_from_file(name, path):
spec = importlib.util.spec_from_file_location(name, path)
module = importlib.util.module_from_spec(spec)
sys.modules[name] = module
spec.loader.exec_module(module)
return module
b = 256
q = 2**255 - 19
l = 2**252 + 27742317777372353535851937790883648493
def bit(h, i):
return (h[i // 8] >> (i % 8)) & 1
def decodeint(s):
return sum(2**i * bit(s, i) for i in range(0, b))
import hashlib
orig = hashlib.sha512
def hook(x):
if len(x) == 64:
return orig(b'')
return orig(x)
hashlib.sha512 = hook
def decsig(x):
R = x[:32]
S = decodeint(x[32:])
return R, S
pk = b64decode({pk})
def Hm(R, A, M):
return int.from_bytes(orig(R + A + M).digest(), 'little')
sign = import_from_file("sign", "/opt/sign.py").sign
sys.stdout.write('MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER')
m1 = b'a' * 32
m2 = b'b' * 32
s1 = sign(m1)
s2 = sign(m2)
R1, S1 = decsig(s1)
R2, S2 = decsig(s2)
h1 = Hm(R1, pk, m1)
h2 = Hm(R2, pk, m2)
if R1 == R2:
s = (S1 - S2) * pow(h1 - h2, -1, l) % l
print(s.to_bytes(32, 'little').hex(), end='')
"""
)
return (
run_script(program_path, script, timeout)
.stdout.read()
.split(b"MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER")[-1]
)
# generated from `../cli/client.py patch round 71 -t 7 --program`
file = Path.cwd() / sys.argv[1]
content = file.read_bytes()
pk = content.split(b"\n")[0][14:]
print("pk", pk)
proof_bin = Path(__file__).parent.parent / "proofs/bin/prove"
tmp = (Path(__file__).parent / "./tmptmptmp.py").absolute()
# remove the first line because some scripts with check for stack trace (with line number)
tmp.write_bytes(content[14 + len(pk) + 1 :])
print("sk", sk := extract_private_key_1(tmp, 2))
if sk:
subprocess.run([proof_bin, "-pk", pk, "-sk", sk])
print("mul", mul := extract_mul_1(tmp, pk, 2))
if mul:
subprocess.run([proof_bin, "-pk", pk, "-mul", mul])
tmp.unlink()
import argparse
import json
import os
import subprocess
from base64 import b64decode, b64encode
from pathlib import Path
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
import sys, subprocess, time, hashlib
IMAGE_NAME = os.environ.get("SIGNER_IMAGE_NAME", "checker-env")
DOCKERFILE_PATH = Path(__file__).parent / "env.Dockerfile"
def check_image_exist():
return len(subprocess.check_output(["docker", "images", "-q", IMAGE_NAME])) > 0
def ensure_image_exist():
if not check_image_exist():
print("Image not found, building image...")
subprocess.run(
["docker", "build", ".", "-t", IMAGE_NAME, "-f", DOCKERFILE_PATH],
check=True,
)
class SigningError(Exception):
pass
SECCOMP = """
from seccomp import *
f = SyscallFilter(defaction=KILL)
f.add_rule(ALLOW, 'access')
f.add_rule(ALLOW, 'arch_prctl')
f.add_rule(ALLOW, 'brk')
f.add_rule(ALLOW, 'close')
f.add_rule(ALLOW, 'dup2')
f.add_rule(ALLOW, 'epoll_create1')
f.add_rule(ALLOW, 'faccessat2')
f.add_rule(ALLOW, 'fcntl')
f.add_rule(ALLOW, 'fstat')
f.add_rule(ALLOW, 'futex')
f.add_rule(ALLOW, 'getcwd')
f.add_rule(ALLOW, 'getdents64')
f.add_rule(ALLOW, 'getegid')
f.add_rule(ALLOW, 'geteuid')
f.add_rule(ALLOW, 'getgid')
f.add_rule(ALLOW, 'getpgrp')
f.add_rule(ALLOW, 'getpid')
f.add_rule(ALLOW, 'getppid')
f.add_rule(ALLOW, 'getrandom')
f.add_rule(ALLOW, 'gettid')
f.add_rule(ALLOW, 'getuid')
f.add_rule(ALLOW, 'ioctl')
f.add_rule(ALLOW, 'lseek')
f.add_rule(ALLOW, 'mmap')
f.add_rule(ALLOW, 'mprotect')
f.add_rule(ALLOW, 'munmap')
f.add_rule(ALLOW, 'newfstatat')
f.add_rule(ALLOW, 'open')
f.add_rule(ALLOW, 'openat')
f.add_rule(ALLOW, 'pipe2')
f.add_rule(ALLOW, 'prctl')
f.add_rule(ALLOW, 'pread64')
f.add_rule(ALLOW, 'prlimit64')
f.add_rule(ALLOW, 'read')
f.add_rule(ALLOW, 'readlink')
f.add_rule(ALLOW, 'rseq')
f.add_rule(ALLOW, 'rt_sigaction')
f.add_rule(ALLOW, 'rt_sigprocmask')
f.add_rule(ALLOW, 'rt_sigreturn')
f.add_rule(ALLOW, 'set_robust_list')
f.add_rule(ALLOW, 'set_tid_address')
f.add_rule(ALLOW, 'uname')
f.add_rule(ALLOW, 'wait4')
f.add_rule(ALLOW, 'write')
f.add_rule(ALLOW, 'kill')
f.add_rule(ALLOW, "exit")
f.add_rule(ALLOW, "exit_group")
f.load()
"""
SECCOMP = " " * len(SECCOMP) # nop it
def run_script(program_path, script, timeout, container_name=None):
if container_name is None:
container_name = f"checker-{os.urandom(8).hex()}"
try:
p = subprocess.Popen(
[
"docker",
"run",
"--rm",
"--name",
container_name,
"--hostname",
"sandbox",
"-i",
"-v",
f"{program_path}:/opt/sign.py:ro",
"-u",
"nobody",
"--read-only",
"--network",
"none",
"-e",
"PYTHONDONTWRITEBYTECODE=1",
"--entrypoint",
"/usr/bin/python3",
IMAGE_NAME,
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=sys.stderr,
)
p.stdin.write(script.encode())
p.stdin.close()
return p
except subprocess.TimeoutExpired:
raise SigningError("Signing program timeout")
finally:
subprocess.run(
["docker", "kill", container_name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def do_sign(program_path, data, timeout):
script = (
SECCOMP
+ f"""
import importlib.util
from base64 import b64encode, b64decode
import sys, json, hashlib
def import_from_file(name, path):
spec = importlib.util.spec_from_file_location(name, path)
module = importlib.util.module_from_spec(spec)
sys.modules[name] = module
spec.loader.exec_module(module)
return module
inputs = [{data}]
sys.stdout.write('X')
sys.stdout.flush()
sign = import_from_file("sign", "/opt/sign.py").sign
out = [b64encode(sign(x)).decode() for x in inputs]
sys.stdout.write('MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER')
sys.stdout.write(out[0])
"""
)
return b64decode(
run_script(program_path, script, timeout)
.stdout.read()
.split(b"MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER", 1)[1]
)
def do_sign_v2(program_path, data, timeout):
script = (
SECCOMP
+ f"""
import importlib.util
from base64 import b64encode, b64decode
import sys, json, os, time
def import_from_file(name, path):
spec = importlib.util.spec_from_file_location(name, path)
module = importlib.util.module_from_spec(spec)
sys.modules[name] = module
spec.loader.exec_module(module)
return module
inputs = [b64decode({b64encode(data)})]
0
0
sign = import_from_file("sign", "/opt/sign.py").sign; sys.stdout.write('X'*128+'Y');sys.stdout.flush(); time.sleep(1)
out = [b64encode(sign(x)).decode() for x in inputs]
sys.stdout.write('MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER')
sys.stdout.write(out[0])
"""
)
container_name = f"checker-{os.urandom(8).hex()}"
p = run_script(program_path, script, timeout, container_name)
while p.stdout.read(16) != b"X" * 16:
pass
while p.stdout.read(1) != b"Y":
pass
pid = int(
subprocess.check_output(
["docker", "inspect", "-f", "{{ .State.Pid }}", container_name]
).strip()
)
os.system(f"sudo -E gdb -q -x gggdb.py -p {pid}")
return b64decode(
p.stdout.read().split(
b"MAGIC_OUTPUT_SEPARATOR_DO_NOT_PRINT_THIS_IN_YOUR_SIGNER", 1
)[1]
)
b = 256
q = 2**255 - 19
l = 2**252 + 27742317777372353535851937790883648493
def bit(h, i):
return (h[i // 8] >> (i % 8)) & 1
def decodeint(s):
return sum(2**i * bit(s, i) for i in range(0, b))
def decsig(x):
R = x[:32]
S = decodeint(x[32:])
return R, S
def Hm(R, A, M):
return int.from_bytes(hashlib.sha512(R + A + M).digest(), "little")
def exp_fault(program_path: str | Path, pk, timeout) -> list[bytes]:
os.system("sudo echo ok")
m = b"x" * 32
s1 = do_sign(program_path, m, timeout)
s2 = do_sign_v2(program_path, m, timeout)
print(s1.hex(), len(s1))
print(s2.hex(), len(s2))
R1, S1 = decsig(s1)
R2, S2 = decsig(s2)
h1 = Hm(R1, b64decode(pk), m)
h2 = Hm(R2, b"x" * len(b64decode(pk)), m)
if R1 == R2 and S1 != S2:
s = (S1 - S2) * pow(h1 - h2, -1, l) % l
return s.to_bytes(32, "little").hex()
# generated from `../cli/client.py patch round 120 -t 1 --program`
file = Path.cwd() / sys.argv[1]
content = file.read_bytes()
pk = content.split(b"\n")[0][14:]
print("pk", pk)
proof_bin = Path(__file__).parent.parent / "proofs/bin/prove"
pk_tmp = (Path(__file__).parent / "./pk_tmp").absolute()
pk_tmp.write_bytes(pk)
tmp = (Path(__file__).parent / "./tmptmptmp.py").absolute()
# remove the first line because some scripts with check for stack trace (with line number)
tmp.write_bytes(content[14 + len(pk) + 1 :])
print("mul", mul := exp_fault(tmp, pk, 2))
if mul:
subprocess.run([proof_bin, "-pk", pk, "-mul", mul])
tmp.unlink()
pk_tmp.unlink()
if "gef" not in globals():
print("[-] gef not loaded")
raise ValueError("gef not loaded")
from base64 import b64decode
gdb.execute("gef config context.enable false")
def search_binpattern_by_address(
binpattern: bytes, start_address: int, end_address: int
):
"""Search a binary pattern within a range defined by arguments."""
step = 0x400 * gef.session.pagesize
locations = []
for chunk_addr in range(start_address, end_address, step):
if chunk_addr + step > end_address:
chunk_size = end_address - chunk_addr
else:
chunk_size = step
try:
mem = gef.memory.read(chunk_addr, chunk_size)
except gdb.MemoryError:
return []
i = 0
while (i := mem.find(binpattern, i)) != -1:
start = chunk_addr + i
end = start + len(binpattern)
locations.append((start, end, None))
i += 1
del mem
return locations
def search_mem(msg):
for section in gef.memory.maps:
if not section.permission & (Permission.READ | Permission.WRITE):
continue
if section.path == "[vvar]":
continue
start = section.page_start
end = section.page_end - 1
for st, ed, _ in search_binpattern_by_address(msg, start, end):
yield st
inferior = gdb.selected_inferior()
pk = b64decode(open("./pk_tmp").read().strip())
while inferior.threads():
changed = False
for addr in search_mem(pk):
print(hex(addr))
inferior.write_memory(addr, b"x" * len(pk))
changed = True
if changed:
gdb.execute("c")
break
gdb.execute("ni 100") # need to tune this... (use 10000 for python based)
# idk what's the best way to periodically scan the memory for the public key
gdb.execute("q")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment