Created
October 18, 2024 02:01
-
-
Save JJTech0130/5939b597bcbdc6c42000d61a4d878b0d to your computer and use it in GitHub Desktop.
Excerpt from an old project of mine, shows a fake implementation of Mach Ports I used in a high-level emulation project.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@self.hook() | |
def _bootstrap_check_in(bootstrap_port: int, sn: int, sp: int): | |
service_name = c_string(self._uc.mem_read(sn, 0x100)) | |
self.debug(f"bootstrap_check_in {hex(bootstrap_port)} {service_name} {hex(sp)}") | |
port = self.mach_ports.create_mach_port_with_name(service_name) | |
self._uc.mem_write(sp, port.to_bytes(4, byteorder='little')) | |
return 0 | |
@self.hook() | |
def _CFMachPortCreateWithPort(allocator: int, port: int, callout: int, context: int, shouldFreeInfo: int): | |
self.debug(f"CFMachPortCreateWithPort {hex(port)} {hex(callout)} {hex(context)} {hex(shouldFreeInfo)}") | |
async def callback(msg: mach.mach_msg_header_t): | |
data = mach.use_mach_msg_header_t(msg, self) | |
self.debug(f"CFMachPortCreateWithPort callback {hex(port)} {data.hex()} {msg}") | |
# Write the message to memory | |
p = self.malloc(len(data)) | |
self._uc.mem_write(p, data) | |
await self.call(callout, [port, p, msg.len(self.ptr_size == 8), 0]) # Make sure to use len(msg) and not len(data) because the trailer is not included | |
mport = corefoundation.CFMachPort(self.cf, port, callback) | |
return mport._key | |
@self.hook() | |
def _CFMachPortGetPort(port: int): | |
cfport = self.cf.get_object(port) | |
return cfport._port | |
@self.hook() | |
def _CFMachPortCreateRunLoopSource(allocator: int, port_i: int, order: int): | |
self.debug(f"CFMachPortCreateRunLoopSource {hex(port_i)} {hex(order)}") | |
port: corefoundation.CFMachPort = self.cf.get_object(port_i) | |
return port.create_source()._key | |
@self.hook() | |
def _CFAllocatorAllocate(allocator: int, size: int, hint: int): | |
ret = self.malloc(size) | |
self.debug(f"CFAllocatorAllocate {hex(size)} -> {hex(ret)}") | |
return ret | |
@self.hook() | |
def _bootstrap_look_up(bootstrap_port: int, service_name: int, sp: int): | |
service_name_s = c_string(self._uc.mem_read(service_name, 0x100)) | |
try: | |
port = self.mach_ports.get_port(service_name_s) | |
self._uc.mem_write(sp, port.to_bytes(4, byteorder='little')) | |
self.debug(f"bootstrap_look_up {hex(bootstrap_port)} {service_name_s} {hex(sp)} -> {hex(port)}") | |
return 0 | |
except ValueError: | |
logger.warning(f"bootstrap_look_up {hex(bootstrap_port)} {service_name_s} {hex(sp)} -> not found") | |
return 1102 | |
@self.hook() | |
def _mig_get_reply_port(): | |
return self.reply_port | |
@self.hook() | |
def _voucher_mach_msg_set(mach_msg: int): | |
self.debug(f"voucher_mach_msg_set {hex(mach_msg)}") | |
return 1 | |
@self.hook() | |
async def _mach_msg(msg: int, option: int, send_size: int, rcv_size: int, rcv_name: int, timeout: int, notify: int): | |
#self.debug(self.mach_ports._mach_name_to_port) | |
self.debug(f"mach_msg msg={hex(msg)} option={hex(option)} send_size={hex(send_size)} rcv_size={hex(rcv_size)} rcv_name={hex(rcv_name)} timeout={hex(timeout)} notify={hex(notify)}") | |
# Parse the options | |
is_send = (option & 0x00000001) != 0 | |
is_rcv = (option & 0x00000002) != 0 | |
if is_send: | |
data = self._uc.mem_read(msg, send_size) | |
header = mach.mach_msg_header_t(data) | |
# self.debug(f"raw header: {data.hex()}") | |
await self.mach_ports.send(header.header_remote_port, mach.prepare_mach_msg_header_t(data, self)) | |
if is_rcv: | |
self.debug(f"mach_msg rcv_name={hex(rcv_name)} waiting for message") | |
# Receive the message from the port | |
header = await self.mach_ports.receive(rcv_name) | |
data = mach.use_mach_msg_header_t(header, self) | |
assert len(data) <= rcv_size | |
self._uc.mem_write(msg, data) | |
return 0 | |
@self.hook() | |
def _mach_msg_destroy(msg: int): | |
self.debug(f"mach_msg_destroy {hex(msg)}") | |
return 0 | |
@self.hook() | |
def _CFAllocatorDeallocate(allocator: int, ptr: int): | |
self.debug(f"CFAllocatorDeallocate {hex(ptr)}") | |
#return 0 | |
@self.hook() | |
def ___chkstk_darwin(): | |
pass | |
#return 0 | |
@self.hook() | |
def _audit_token_to_au32(audit_token: int, auid: int, euid: int, egid: int, ruid: int, rgid: int, pid: int, asid: int, tid: int): | |
self.debug(f"audit_token_to_au32 {hex(audit_token)} {hex(auid)} {hex(euid)} {hex(egid)} {hex(ruid)} {hex(rgid)} {hex(pid)} {hex(asid)} {hex(tid)}") | |
assert audit_token != 0 | |
#self.debug(self._uc.mem_read(audit_token, 0x32).hex()) | |
if auid != 0: | |
raise NotImplementedError("audit_token_to_au32 auid not implemented") | |
if euid != 0: | |
raise NotImplementedError("audit_token_to_au32 euid not implemented") | |
if egid != 0: | |
raise NotImplementedError("audit_token_to_au32 egid not implemented") | |
if ruid != 0: | |
raise NotImplementedError("audit_token_to_au32 ruid not implemented") | |
if rgid != 0: | |
raise NotImplementedError("audit_token_to_au32 rgid not implemented") | |
if pid != 0: | |
#raise NotImplementedError("audit_token_to_au32 pid not implemented") | |
self._uc.mem_write(pid, (1686).to_bytes(4, byteorder='little')) # PID of identityservicesd on the phone /shrug | |
if asid != 0: | |
raise NotImplementedError("audit_token_to_au32 asid not implemented") | |
if tid != 0: | |
raise NotImplementedError("audit_token_to_au32 tid not implemented") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
import typing | |
from . import process | |
class mach_msg_type_descriptor_t: | |
type: int | |
def __init__(self, data: bytes): | |
assert len(data) >= 12 | |
logging.debug(f"descriptor: {data.hex()}") | |
self.type = int.from_bytes(data[11:12], byteorder='little') | |
def to_bytes(self) -> bytes: | |
return b"\x00" * 11 + self.type.to_bytes(1, byteorder='little') | |
# # typedef struct { | |
# uint64_t address; | |
# boolean_t deallocate: 8; | |
# mach_msg_copy_options_t copy: 8; | |
# unsigned int pad1: 8; | |
# mach_msg_descriptor_type_t type: 8; | |
# mach_msg_size_t size; | |
# } mach_msg_ool_descriptor64_t; | |
# #typedef struct { | |
# uint32_t address; | |
# mach_msg_size_t size; | |
# boolean_t deallocate: 8; | |
# mach_msg_copy_options_t copy: 8; | |
# unsigned int pad1: 8; | |
# mach_msg_descriptor_type_t type: 8; | |
# } mach_msg_ool_descriptor32_t; | |
class mach_msg_ool_descriptor_t(mach_msg_type_descriptor_t): | |
type: int = 1 | |
address: int | |
deallocate: bool | |
copy: int | |
size: int | |
data: bytes | None = None | |
def __init__(self, data: bytes | None): | |
if data is None: | |
return | |
_64_bit = len(data) == 16 | |
logging.debug(f"ool descriptor: {data.hex()}") | |
if _64_bit: | |
self.address = int.from_bytes(data[0:8], byteorder='little') | |
self.deallocate = bool(data[8]) | |
self.copy = data[9] | |
self.type = data[11] | |
self.size = int.from_bytes(data[12:16], byteorder='little') | |
else: | |
self.address = int.from_bytes(data[0:4], byteorder='little') | |
self.size = int.from_bytes(data[4:8], byteorder='little') | |
self.deallocate = bool(data[8]) | |
self.copy = data[9] | |
self.type = data[11] | |
def __repr__(self): | |
return f"mach_msg_ool_descriptor_t(address={hex(self.address)}, deallocate={self.deallocate}, copy={self.copy}, type={self.type}, size={self.size} bytes <{self.to_bytes(False).hex()}>)" | |
def to_bytes(self, _64_bit) -> bytes: | |
if _64_bit: | |
return self.address.to_bytes(8, byteorder='little') + self.deallocate.to_bytes(1, byteorder='little') + self.copy.to_bytes(1, byteorder='little') + b"\x00" + self.type.to_bytes(1, byteorder='little') + self.size.to_bytes(4, byteorder='little') | |
else: | |
return self.address.to_bytes(4, byteorder='little') + self.size.to_bytes(4, byteorder='little') + self.deallocate.to_bytes(1, byteorder='little') + self.copy.to_bytes(1, byteorder='little') + b"\x00" + self.type.to_bytes(1, byteorder='little') | |
import logging | |
from io import BytesIO | |
class mach_msg_header_t: | |
header_bits_complex: bool | |
header_bits_remote_type: int | |
header_bits_local_type: int | |
header_bits_voucher_type: int | |
header_msg_size: int | |
header_remote_port: int | |
header_local_port: int | |
header_voucher: int | |
header_id: int | |
descriptors: list[mach_msg_type_descriptor_t] | |
extra: bytes | |
def __init__(self, data: bytes | None, _64_bit = False): | |
if data is None: | |
return | |
assert len(data) >= 24 | |
#logging.debug(f"bits: {data[0:4].hex()} complex: {data[0] & 0x80}, int {int.from_bytes(data[0:4], byteorder='little')}") | |
# Reverse the endian of the bits | |
data = data[0:4][::-1] + data[4:] | |
logging.debug(f"bits: {data[0:4].hex()} complex: {data[0] & 0x80}, int {int.from_bytes(data[0:4], byteorder='little')}") | |
self.header_bits_complex = bool(data[0] & 0x80) | |
self.header_bits_remote_type = data[3] & 0x1f | |
self.header_bits_local_type = data[2] & 0x1f | |
self.header_bits_voucher_type = data[1] & 0x1f | |
self.header_bits_options = int.from_bytes(data[0:4], byteorder='big') &~ 0x801f1f1f | |
#assert self.header_bits_options == 0 | |
#logging.warning(f"header_bits_options: {self.header_bits_options}") | |
self.header_msg_size = int.from_bytes(data[4:8], byteorder='little') | |
self.header_remote_port = int.from_bytes(data[8:12], byteorder='little') | |
self.header_local_port = int.from_bytes(data[12:16], byteorder='little') | |
self.header_voucher = int.from_bytes(data[16:20], byteorder='little') | |
self.header_id = int.from_bytes(data[20:24], byteorder='little') | |
self.descriptors = [] | |
if self.header_bits_complex: | |
# Parse the descriptors | |
descriptor_count = int.from_bytes(data[24:28], byteorder='little') | |
data_i = BytesIO(data[28:]) | |
for i in range(descriptor_count): | |
d = data_i.read(12) | |
descriptor = mach_msg_type_descriptor_t(d) | |
if descriptor.type == 1: | |
if _64_bit: | |
d += data_i.read(4) | |
self.descriptors.append(mach_msg_ool_descriptor_t(d)) | |
else: | |
raise ValueError(f"Unknown descriptor type {descriptor.type}") | |
self.extra = data_i.read() | |
else: | |
self.extra = data[24:] | |
self.trailer = bytes.fromhex("0000000000000000") | |
@property | |
def header_bits(self): | |
return (self.header_bits_remote_type | (self.header_bits_local_type << 8) | (self.header_bits_voucher_type << 16) | (self.header_bits_complex << 31)) | |
@header_bits.setter | |
def header_bits(self, value): | |
self.header_bits_remote_type = value & 0x1f | |
self.header_bits_local_type = (value >> 8) & 0x1f | |
self.header_bits_voucher_type = (value >> 16) & 0x1f | |
self.header_bits_complex = bool(value >> 31) | |
MACH_MSG_TYPE = { | |
0: "MACH_MSG_TYPE_PORT_NONE", | |
16: "MACH_MSG_TYPE_MOVE_RECEIVE", | |
17: "MACH_MSG_TYPE_MOVE_SEND", | |
18: "MACH_MSG_TYPE_MOVE_SEND_ONCE", | |
19: "MACH_MSG_TYPE_COPY_SEND", | |
20: "MACH_MSG_TYPE_MAKE_SEND", | |
21: "MACH_MSG_TYPE_MAKE_SEND_ONCE", | |
22: "MACH_MSG_TYPE_COPY_RECEIVE", | |
24: "MACH_MSG_TYPE_DISPOSE_RECEIVE", | |
25: "MACH_MSG_TYPE_DISPOSE_SEND", | |
26: "MACH_MSG_TYPE_DISPOSE_SEND_ONCE" | |
} | |
def len(self, _64_bit=True): | |
return len(self.to_bytes(_64_bit)) - len(self.trailer) | |
def __repr__(self): | |
return f"mach_msg_header_t(header_bits_complex={self.header_bits_complex}, header_bits_remote_type={self.MACH_MSG_TYPE[self.header_bits_remote_type]}, header_bits_local_type={self.MACH_MSG_TYPE[self.header_bits_local_type]}, header_bits_voucher_type={self.MACH_MSG_TYPE[self.header_bits_voucher_type]}, header_msg_size={self.header_msg_size}, header_remote_port={hex(self.header_remote_port)}, header_local_port={hex(self.header_local_port)}, header_voucher={hex(self.header_voucher)}, header_id={self.header_id}, descriptors={self.descriptors}, extra={self.extra.hex()} <{self.to_bytes(False,False).hex()}>)" | |
def to_bytes(self, _64_bit=True, trailer=True) -> bytes: | |
out = b"" | |
out += (self.header_bits_remote_type | (self.header_bits_local_type << 8) | (self.header_bits_voucher_type << 16) | (self.header_bits_complex << 31)).to_bytes(4, byteorder='little') | |
out += self.header_msg_size.to_bytes(4, byteorder='little') | |
out += self.header_remote_port.to_bytes(4, byteorder='little') | |
out += self.header_local_port.to_bytes(4, byteorder='little') | |
out += self.header_voucher.to_bytes(4, byteorder='little') | |
out += self.header_id.to_bytes(4, byteorder='little') | |
if self.header_bits_complex: | |
out += len(self.descriptors).to_bytes(4, byteorder='little') | |
for descriptor in self.descriptors: | |
if isinstance(descriptor, mach_msg_ool_descriptor_t): | |
out += descriptor.to_bytes(_64_bit) | |
else: | |
out += descriptor.to_bytes() | |
out += self.extra | |
if trailer: | |
out += self.trailer | |
return out | |
class MachPorts: | |
# Rather than giving out individual port rights, we just give them a unique port number. No security is necessary. | |
_mach_ports: dict[int, tuple[trio.MemorySendChannel, trio.MemoryReceiveChannel]] = {} | |
# Name to port number mapping (for bootstrap lookup) | |
_mach_name_to_port: dict[str, int] = {} | |
def create_mach_port(self) -> int: | |
send, recv = trio.open_memory_channel(0) | |
# Get the highest port number | |
port = max(self._mach_ports.keys(), default=0) + 1 | |
self._mach_ports[port] = (send, recv) | |
return port | |
def create_mach_port_with_name(self, name: str) -> int: | |
if name in self._mach_name_to_port: | |
raise ValueError("Port name already registered") | |
port = self.create_mach_port() | |
self._mach_name_to_port[name] = port | |
return port | |
async def send(self, port: int, header: mach_msg_header_t) -> None: | |
logging.debug(f"SENDING header: {header}") | |
await self._mach_ports[port][0].send(header) | |
async def receive(self, port: int, trailer_type: int = 0, process = True) -> mach_msg_header_t: | |
header = await self._mach_ports[port][1].receive() | |
if process: | |
header = kernel_process_mach_msg_header_t(header, trailer_type) | |
#logging.debug(f"RECEIVED header: {header}") | |
return header | |
def get_port(self, name: str) -> int: | |
if name not in self._mach_name_to_port: | |
raise ValueError("Port name not registered") | |
return self._mach_name_to_port[name] | |
def prepare_mach_msg_header_t(header_raw: bytes, process: "process.Process") -> mach_msg_header_t: | |
# Read the descriptors | |
header = mach_msg_header_t(header_raw, _64_bit=process.ptr_size == 8) | |
for descriptor in header.descriptors: | |
if isinstance(descriptor, mach_msg_ool_descriptor_t): | |
assert descriptor.data is None | |
descriptor.data = bytes(process._uc.mem_read(descriptor.address, descriptor.size)) | |
print(f"{descriptor.data.hex()}") | |
return header | |
def use_mach_msg_header_t(header: mach_msg_header_t, process: "process.Process") -> bytes: | |
# Write the descriptors | |
for descriptor in header.descriptors: | |
if isinstance(descriptor, mach_msg_ool_descriptor_t): | |
assert descriptor.data is not None | |
descriptor.deallocate = True | |
# Allocate the memory | |
descriptor.address = process.malloc(descriptor.size) | |
print(f"{descriptor.data.hex()}") | |
process._uc.mem_write(descriptor.address, descriptor.data) | |
header.header_msg_size = header.len(_64_bit=process.ptr_size == 8) | |
logging.debug(f"RECEIVED header: {header}") | |
return header.to_bytes(_64_bit = process.ptr_size == 8) | |
def kernel_process_mach_msg_header_t(header: mach_msg_header_t, trailer_type: int) -> mach_msg_header_t: | |
# Clear the priority voucher (https://robert.sesek.com/2023/6/mach_vouchers.html) | |
header.header_bits_voucher_type = 0 | |
header.header_voucher = 0 | |
#header.header_msg_size = len(header) | |
#header.header_msg_size = 0x38 | |
# Swap the ports | |
header.header_remote_port, header.header_local_port = header.header_local_port, header.header_remote_port | |
# Swap and downgrade the type | |
header.header_bits_remote_type, header.header_bits_local_type = header.header_bits_local_type, header.header_bits_remote_type | |
# MAKE_SEND_ONCE -> MOVE_SEND_ONCE | |
# COPY_SEND -> MOVE_SEND | |
# MOVE_SEND_ONCE -> PORT_NONE | |
if header.header_bits_local_type == 21: | |
header.header_bits_local_type = 18 | |
elif header.header_bits_local_type == 19: | |
header.header_bits_local_type = 17 | |
elif header.header_bits_local_type == 18: | |
header.header_bits_local_type = 0 | |
elif header.header_bits_local_type == 0: | |
pass | |
else: | |
raise ValueError(f"Unknown type {header.header_bits_local_type}") | |
if header.header_bits_remote_type == 21: | |
header.header_bits_remote_type = 18 | |
elif header.header_bits_remote_type == 19: | |
header.header_bits_remote_type = 17 | |
elif header.header_bits_remote_type == 18: | |
header.header_bits_remote_type = 0 | |
elif header.header_bits_remote_type == 0: | |
pass | |
else: | |
raise ValueError(f"Unknown type {header.header_bits_remote_type}") | |
# typedef struct { | |
# mach_msg_trailer_type_t msgh_trailer_type; | |
# mach_msg_trailer_size_t msgh_trailer_size; | |
# mach_port_seqno_t msgh_seqno; | |
# security_token_t msgh_sender; | |
# audit_token_t msgh_audit; | |
# mach_port_context_t msgh_context; | |
# mach_msg_filter_id msgh_ad; | |
# msg_labels_t msgh_labels; | |
# } mach_msg_mac_trailer_t; | |
#0x16d3e2c1c: 00 00 00 00 44 00 00 00 18 00 00 00 f5 01 00 00 ....D........... | |
# 0x16d3e2c2c: f5 01 00 00 ff ff ff ff f5 01 00 00 f5 01 00 00 ................ | |
# 0x16d3e2c3c: f5 01 00 00 f5 01 00 00 96 06 00 00 00 00 00 00 ................ | |
# 0x16d3e2c4c: 8f 11 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................ | |
# 0x16d3e2c5c: 00 00 00 00 .... | |
if trailer_type == 0: # NULL trailer | |
header.trailer = bytes.fromhex("0000000000000000") | |
elif trailer_type == 7: # AV trailer | |
trailer = b"" | |
trailer += (0).to_bytes(4, byteorder='little') # msgh_trailer_type | |
trailer += (68).to_bytes(4, byteorder='little') # msgh_trailer_size | |
trailer += (0).to_bytes(4, byteorder='little') # msgh_seqno # TODO: Increment this | |
trailer += (0xDEADBEEF1).to_bytes(8, byteorder='little') # msgh_sender # TODO: Are security tokens necessary? | |
trailer += (0xDEADA0D1).to_bytes(32, byteorder='little') # msgh_audit # TODO: Are audit tokens necessary? | |
trailer += (0).to_bytes(8, byteorder='little') # msgh_context | |
trailer += (0).to_bytes(4, byteorder='little') # msgh_ad | |
trailer += (0).to_bytes(4, byteorder='little') # msgh_labels | |
header.trailer = trailer | |
logging.debug(f"trailer: {trailer.hex()}") | |
else: | |
raise ValueError(f"Unknown trailer type {trailer_type}") | |
return header |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment