Skip to content

Instantly share code, notes, and snippets.

@JJTech0130
Created October 18, 2024 02:01
Show Gist options
  • Save JJTech0130/5939b597bcbdc6c42000d61a4d878b0d to your computer and use it in GitHub Desktop.
Save JJTech0130/5939b597bcbdc6c42000d61a4d878b0d to your computer and use it in GitHub Desktop.
Excerpt from an old project of mine, shows a fake implementation of Mach Ports I used in a high-level emulation project.
@self.hook()
def _bootstrap_check_in(bootstrap_port: int, sn: int, sp: int):
service_name = c_string(self._uc.mem_read(sn, 0x100))
self.debug(f"bootstrap_check_in {hex(bootstrap_port)} {service_name} {hex(sp)}")
port = self.mach_ports.create_mach_port_with_name(service_name)
self._uc.mem_write(sp, port.to_bytes(4, byteorder='little'))
return 0
@self.hook()
def _CFMachPortCreateWithPort(allocator: int, port: int, callout: int, context: int, shouldFreeInfo: int):
self.debug(f"CFMachPortCreateWithPort {hex(port)} {hex(callout)} {hex(context)} {hex(shouldFreeInfo)}")
async def callback(msg: mach.mach_msg_header_t):
data = mach.use_mach_msg_header_t(msg, self)
self.debug(f"CFMachPortCreateWithPort callback {hex(port)} {data.hex()} {msg}")
# Write the message to memory
p = self.malloc(len(data))
self._uc.mem_write(p, data)
await self.call(callout, [port, p, msg.len(self.ptr_size == 8), 0]) # Make sure to use len(msg) and not len(data) because the trailer is not included
mport = corefoundation.CFMachPort(self.cf, port, callback)
return mport._key
@self.hook()
def _CFMachPortGetPort(port: int):
cfport = self.cf.get_object(port)
return cfport._port
@self.hook()
def _CFMachPortCreateRunLoopSource(allocator: int, port_i: int, order: int):
self.debug(f"CFMachPortCreateRunLoopSource {hex(port_i)} {hex(order)}")
port: corefoundation.CFMachPort = self.cf.get_object(port_i)
return port.create_source()._key
@self.hook()
def _CFAllocatorAllocate(allocator: int, size: int, hint: int):
ret = self.malloc(size)
self.debug(f"CFAllocatorAllocate {hex(size)} -> {hex(ret)}")
return ret
@self.hook()
def _bootstrap_look_up(bootstrap_port: int, service_name: int, sp: int):
service_name_s = c_string(self._uc.mem_read(service_name, 0x100))
try:
port = self.mach_ports.get_port(service_name_s)
self._uc.mem_write(sp, port.to_bytes(4, byteorder='little'))
self.debug(f"bootstrap_look_up {hex(bootstrap_port)} {service_name_s} {hex(sp)} -> {hex(port)}")
return 0
except ValueError:
logger.warning(f"bootstrap_look_up {hex(bootstrap_port)} {service_name_s} {hex(sp)} -> not found")
return 1102
@self.hook()
def _mig_get_reply_port():
return self.reply_port
@self.hook()
def _voucher_mach_msg_set(mach_msg: int):
self.debug(f"voucher_mach_msg_set {hex(mach_msg)}")
return 1
@self.hook()
async def _mach_msg(msg: int, option: int, send_size: int, rcv_size: int, rcv_name: int, timeout: int, notify: int):
#self.debug(self.mach_ports._mach_name_to_port)
self.debug(f"mach_msg msg={hex(msg)} option={hex(option)} send_size={hex(send_size)} rcv_size={hex(rcv_size)} rcv_name={hex(rcv_name)} timeout={hex(timeout)} notify={hex(notify)}")
# Parse the options
is_send = (option & 0x00000001) != 0
is_rcv = (option & 0x00000002) != 0
if is_send:
data = self._uc.mem_read(msg, send_size)
header = mach.mach_msg_header_t(data)
# self.debug(f"raw header: {data.hex()}")
await self.mach_ports.send(header.header_remote_port, mach.prepare_mach_msg_header_t(data, self))
if is_rcv:
self.debug(f"mach_msg rcv_name={hex(rcv_name)} waiting for message")
# Receive the message from the port
header = await self.mach_ports.receive(rcv_name)
data = mach.use_mach_msg_header_t(header, self)
assert len(data) <= rcv_size
self._uc.mem_write(msg, data)
return 0
@self.hook()
def _mach_msg_destroy(msg: int):
self.debug(f"mach_msg_destroy {hex(msg)}")
return 0
@self.hook()
def _CFAllocatorDeallocate(allocator: int, ptr: int):
self.debug(f"CFAllocatorDeallocate {hex(ptr)}")
#return 0
@self.hook()
def ___chkstk_darwin():
pass
#return 0
@self.hook()
def _audit_token_to_au32(audit_token: int, auid: int, euid: int, egid: int, ruid: int, rgid: int, pid: int, asid: int, tid: int):
self.debug(f"audit_token_to_au32 {hex(audit_token)} {hex(auid)} {hex(euid)} {hex(egid)} {hex(ruid)} {hex(rgid)} {hex(pid)} {hex(asid)} {hex(tid)}")
assert audit_token != 0
#self.debug(self._uc.mem_read(audit_token, 0x32).hex())
if auid != 0:
raise NotImplementedError("audit_token_to_au32 auid not implemented")
if euid != 0:
raise NotImplementedError("audit_token_to_au32 euid not implemented")
if egid != 0:
raise NotImplementedError("audit_token_to_au32 egid not implemented")
if ruid != 0:
raise NotImplementedError("audit_token_to_au32 ruid not implemented")
if rgid != 0:
raise NotImplementedError("audit_token_to_au32 rgid not implemented")
if pid != 0:
#raise NotImplementedError("audit_token_to_au32 pid not implemented")
self._uc.mem_write(pid, (1686).to_bytes(4, byteorder='little')) # PID of identityservicesd on the phone /shrug
if asid != 0:
raise NotImplementedError("audit_token_to_au32 asid not implemented")
if tid != 0:
raise NotImplementedError("audit_token_to_au32 tid not implemented")
import trio
import typing
from . import process
class mach_msg_type_descriptor_t:
type: int
def __init__(self, data: bytes):
assert len(data) >= 12
logging.debug(f"descriptor: {data.hex()}")
self.type = int.from_bytes(data[11:12], byteorder='little')
def to_bytes(self) -> bytes:
return b"\x00" * 11 + self.type.to_bytes(1, byteorder='little')
# # typedef struct {
# uint64_t address;
# boolean_t deallocate: 8;
# mach_msg_copy_options_t copy: 8;
# unsigned int pad1: 8;
# mach_msg_descriptor_type_t type: 8;
# mach_msg_size_t size;
# } mach_msg_ool_descriptor64_t;
# #typedef struct {
# uint32_t address;
# mach_msg_size_t size;
# boolean_t deallocate: 8;
# mach_msg_copy_options_t copy: 8;
# unsigned int pad1: 8;
# mach_msg_descriptor_type_t type: 8;
# } mach_msg_ool_descriptor32_t;
class mach_msg_ool_descriptor_t(mach_msg_type_descriptor_t):
type: int = 1
address: int
deallocate: bool
copy: int
size: int
data: bytes | None = None
def __init__(self, data: bytes | None):
if data is None:
return
_64_bit = len(data) == 16
logging.debug(f"ool descriptor: {data.hex()}")
if _64_bit:
self.address = int.from_bytes(data[0:8], byteorder='little')
self.deallocate = bool(data[8])
self.copy = data[9]
self.type = data[11]
self.size = int.from_bytes(data[12:16], byteorder='little')
else:
self.address = int.from_bytes(data[0:4], byteorder='little')
self.size = int.from_bytes(data[4:8], byteorder='little')
self.deallocate = bool(data[8])
self.copy = data[9]
self.type = data[11]
def __repr__(self):
return f"mach_msg_ool_descriptor_t(address={hex(self.address)}, deallocate={self.deallocate}, copy={self.copy}, type={self.type}, size={self.size} bytes <{self.to_bytes(False).hex()}>)"
def to_bytes(self, _64_bit) -> bytes:
if _64_bit:
return self.address.to_bytes(8, byteorder='little') + self.deallocate.to_bytes(1, byteorder='little') + self.copy.to_bytes(1, byteorder='little') + b"\x00" + self.type.to_bytes(1, byteorder='little') + self.size.to_bytes(4, byteorder='little')
else:
return self.address.to_bytes(4, byteorder='little') + self.size.to_bytes(4, byteorder='little') + self.deallocate.to_bytes(1, byteorder='little') + self.copy.to_bytes(1, byteorder='little') + b"\x00" + self.type.to_bytes(1, byteorder='little')
import logging
from io import BytesIO
class mach_msg_header_t:
header_bits_complex: bool
header_bits_remote_type: int
header_bits_local_type: int
header_bits_voucher_type: int
header_msg_size: int
header_remote_port: int
header_local_port: int
header_voucher: int
header_id: int
descriptors: list[mach_msg_type_descriptor_t]
extra: bytes
def __init__(self, data: bytes | None, _64_bit = False):
if data is None:
return
assert len(data) >= 24
#logging.debug(f"bits: {data[0:4].hex()} complex: {data[0] & 0x80}, int {int.from_bytes(data[0:4], byteorder='little')}")
# Reverse the endian of the bits
data = data[0:4][::-1] + data[4:]
logging.debug(f"bits: {data[0:4].hex()} complex: {data[0] & 0x80}, int {int.from_bytes(data[0:4], byteorder='little')}")
self.header_bits_complex = bool(data[0] & 0x80)
self.header_bits_remote_type = data[3] & 0x1f
self.header_bits_local_type = data[2] & 0x1f
self.header_bits_voucher_type = data[1] & 0x1f
self.header_bits_options = int.from_bytes(data[0:4], byteorder='big') &~ 0x801f1f1f
#assert self.header_bits_options == 0
#logging.warning(f"header_bits_options: {self.header_bits_options}")
self.header_msg_size = int.from_bytes(data[4:8], byteorder='little')
self.header_remote_port = int.from_bytes(data[8:12], byteorder='little')
self.header_local_port = int.from_bytes(data[12:16], byteorder='little')
self.header_voucher = int.from_bytes(data[16:20], byteorder='little')
self.header_id = int.from_bytes(data[20:24], byteorder='little')
self.descriptors = []
if self.header_bits_complex:
# Parse the descriptors
descriptor_count = int.from_bytes(data[24:28], byteorder='little')
data_i = BytesIO(data[28:])
for i in range(descriptor_count):
d = data_i.read(12)
descriptor = mach_msg_type_descriptor_t(d)
if descriptor.type == 1:
if _64_bit:
d += data_i.read(4)
self.descriptors.append(mach_msg_ool_descriptor_t(d))
else:
raise ValueError(f"Unknown descriptor type {descriptor.type}")
self.extra = data_i.read()
else:
self.extra = data[24:]
self.trailer = bytes.fromhex("0000000000000000")
@property
def header_bits(self):
return (self.header_bits_remote_type | (self.header_bits_local_type << 8) | (self.header_bits_voucher_type << 16) | (self.header_bits_complex << 31))
@header_bits.setter
def header_bits(self, value):
self.header_bits_remote_type = value & 0x1f
self.header_bits_local_type = (value >> 8) & 0x1f
self.header_bits_voucher_type = (value >> 16) & 0x1f
self.header_bits_complex = bool(value >> 31)
MACH_MSG_TYPE = {
0: "MACH_MSG_TYPE_PORT_NONE",
16: "MACH_MSG_TYPE_MOVE_RECEIVE",
17: "MACH_MSG_TYPE_MOVE_SEND",
18: "MACH_MSG_TYPE_MOVE_SEND_ONCE",
19: "MACH_MSG_TYPE_COPY_SEND",
20: "MACH_MSG_TYPE_MAKE_SEND",
21: "MACH_MSG_TYPE_MAKE_SEND_ONCE",
22: "MACH_MSG_TYPE_COPY_RECEIVE",
24: "MACH_MSG_TYPE_DISPOSE_RECEIVE",
25: "MACH_MSG_TYPE_DISPOSE_SEND",
26: "MACH_MSG_TYPE_DISPOSE_SEND_ONCE"
}
def len(self, _64_bit=True):
return len(self.to_bytes(_64_bit)) - len(self.trailer)
def __repr__(self):
return f"mach_msg_header_t(header_bits_complex={self.header_bits_complex}, header_bits_remote_type={self.MACH_MSG_TYPE[self.header_bits_remote_type]}, header_bits_local_type={self.MACH_MSG_TYPE[self.header_bits_local_type]}, header_bits_voucher_type={self.MACH_MSG_TYPE[self.header_bits_voucher_type]}, header_msg_size={self.header_msg_size}, header_remote_port={hex(self.header_remote_port)}, header_local_port={hex(self.header_local_port)}, header_voucher={hex(self.header_voucher)}, header_id={self.header_id}, descriptors={self.descriptors}, extra={self.extra.hex()} <{self.to_bytes(False,False).hex()}>)"
def to_bytes(self, _64_bit=True, trailer=True) -> bytes:
out = b""
out += (self.header_bits_remote_type | (self.header_bits_local_type << 8) | (self.header_bits_voucher_type << 16) | (self.header_bits_complex << 31)).to_bytes(4, byteorder='little')
out += self.header_msg_size.to_bytes(4, byteorder='little')
out += self.header_remote_port.to_bytes(4, byteorder='little')
out += self.header_local_port.to_bytes(4, byteorder='little')
out += self.header_voucher.to_bytes(4, byteorder='little')
out += self.header_id.to_bytes(4, byteorder='little')
if self.header_bits_complex:
out += len(self.descriptors).to_bytes(4, byteorder='little')
for descriptor in self.descriptors:
if isinstance(descriptor, mach_msg_ool_descriptor_t):
out += descriptor.to_bytes(_64_bit)
else:
out += descriptor.to_bytes()
out += self.extra
if trailer:
out += self.trailer
return out
class MachPorts:
# Rather than giving out individual port rights, we just give them a unique port number. No security is necessary.
_mach_ports: dict[int, tuple[trio.MemorySendChannel, trio.MemoryReceiveChannel]] = {}
# Name to port number mapping (for bootstrap lookup)
_mach_name_to_port: dict[str, int] = {}
def create_mach_port(self) -> int:
send, recv = trio.open_memory_channel(0)
# Get the highest port number
port = max(self._mach_ports.keys(), default=0) + 1
self._mach_ports[port] = (send, recv)
return port
def create_mach_port_with_name(self, name: str) -> int:
if name in self._mach_name_to_port:
raise ValueError("Port name already registered")
port = self.create_mach_port()
self._mach_name_to_port[name] = port
return port
async def send(self, port: int, header: mach_msg_header_t) -> None:
logging.debug(f"SENDING header: {header}")
await self._mach_ports[port][0].send(header)
async def receive(self, port: int, trailer_type: int = 0, process = True) -> mach_msg_header_t:
header = await self._mach_ports[port][1].receive()
if process:
header = kernel_process_mach_msg_header_t(header, trailer_type)
#logging.debug(f"RECEIVED header: {header}")
return header
def get_port(self, name: str) -> int:
if name not in self._mach_name_to_port:
raise ValueError("Port name not registered")
return self._mach_name_to_port[name]
def prepare_mach_msg_header_t(header_raw: bytes, process: "process.Process") -> mach_msg_header_t:
# Read the descriptors
header = mach_msg_header_t(header_raw, _64_bit=process.ptr_size == 8)
for descriptor in header.descriptors:
if isinstance(descriptor, mach_msg_ool_descriptor_t):
assert descriptor.data is None
descriptor.data = bytes(process._uc.mem_read(descriptor.address, descriptor.size))
print(f"{descriptor.data.hex()}")
return header
def use_mach_msg_header_t(header: mach_msg_header_t, process: "process.Process") -> bytes:
# Write the descriptors
for descriptor in header.descriptors:
if isinstance(descriptor, mach_msg_ool_descriptor_t):
assert descriptor.data is not None
descriptor.deallocate = True
# Allocate the memory
descriptor.address = process.malloc(descriptor.size)
print(f"{descriptor.data.hex()}")
process._uc.mem_write(descriptor.address, descriptor.data)
header.header_msg_size = header.len(_64_bit=process.ptr_size == 8)
logging.debug(f"RECEIVED header: {header}")
return header.to_bytes(_64_bit = process.ptr_size == 8)
def kernel_process_mach_msg_header_t(header: mach_msg_header_t, trailer_type: int) -> mach_msg_header_t:
# Clear the priority voucher (https://robert.sesek.com/2023/6/mach_vouchers.html)
header.header_bits_voucher_type = 0
header.header_voucher = 0
#header.header_msg_size = len(header)
#header.header_msg_size = 0x38
# Swap the ports
header.header_remote_port, header.header_local_port = header.header_local_port, header.header_remote_port
# Swap and downgrade the type
header.header_bits_remote_type, header.header_bits_local_type = header.header_bits_local_type, header.header_bits_remote_type
# MAKE_SEND_ONCE -> MOVE_SEND_ONCE
# COPY_SEND -> MOVE_SEND
# MOVE_SEND_ONCE -> PORT_NONE
if header.header_bits_local_type == 21:
header.header_bits_local_type = 18
elif header.header_bits_local_type == 19:
header.header_bits_local_type = 17
elif header.header_bits_local_type == 18:
header.header_bits_local_type = 0
elif header.header_bits_local_type == 0:
pass
else:
raise ValueError(f"Unknown type {header.header_bits_local_type}")
if header.header_bits_remote_type == 21:
header.header_bits_remote_type = 18
elif header.header_bits_remote_type == 19:
header.header_bits_remote_type = 17
elif header.header_bits_remote_type == 18:
header.header_bits_remote_type = 0
elif header.header_bits_remote_type == 0:
pass
else:
raise ValueError(f"Unknown type {header.header_bits_remote_type}")
# typedef struct {
# mach_msg_trailer_type_t msgh_trailer_type;
# mach_msg_trailer_size_t msgh_trailer_size;
# mach_port_seqno_t msgh_seqno;
# security_token_t msgh_sender;
# audit_token_t msgh_audit;
# mach_port_context_t msgh_context;
# mach_msg_filter_id msgh_ad;
# msg_labels_t msgh_labels;
# } mach_msg_mac_trailer_t;
#0x16d3e2c1c: 00 00 00 00 44 00 00 00 18 00 00 00 f5 01 00 00 ....D...........
# 0x16d3e2c2c: f5 01 00 00 ff ff ff ff f5 01 00 00 f5 01 00 00 ................
# 0x16d3e2c3c: f5 01 00 00 f5 01 00 00 96 06 00 00 00 00 00 00 ................
# 0x16d3e2c4c: 8f 11 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................
# 0x16d3e2c5c: 00 00 00 00 ....
if trailer_type == 0: # NULL trailer
header.trailer = bytes.fromhex("0000000000000000")
elif trailer_type == 7: # AV trailer
trailer = b""
trailer += (0).to_bytes(4, byteorder='little') # msgh_trailer_type
trailer += (68).to_bytes(4, byteorder='little') # msgh_trailer_size
trailer += (0).to_bytes(4, byteorder='little') # msgh_seqno # TODO: Increment this
trailer += (0xDEADBEEF1).to_bytes(8, byteorder='little') # msgh_sender # TODO: Are security tokens necessary?
trailer += (0xDEADA0D1).to_bytes(32, byteorder='little') # msgh_audit # TODO: Are audit tokens necessary?
trailer += (0).to_bytes(8, byteorder='little') # msgh_context
trailer += (0).to_bytes(4, byteorder='little') # msgh_ad
trailer += (0).to_bytes(4, byteorder='little') # msgh_labels
header.trailer = trailer
logging.debug(f"trailer: {trailer.hex()}")
else:
raise ValueError(f"Unknown trailer type {trailer_type}")
return header
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment