Last active
May 3, 2026 18:48
-
-
Save sjmf/c4329fd27e403a264648bf4e7744655a to your computer and use it in GitHub Desktop.
CH9350L lower-computer protocol PoC — kvm-serial issue #13
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| CH9350L UART protocol PoC — issue #13. | |
| Two subcommands: | |
| sniff PORT passively decode frames on a serial line. Tag with --tag | |
| LC/UC for two-port bidirectional captures (merge logs by | |
| timestamp afterwards). | |
| emulate PORT drive a real CH9350L upper computer from this host: send | |
| the attach sequence (0x86 / 0x80 / 0x89 / 0x81 ×N), then | |
| forward keystrokes and mouse motion typed at the prompt. | |
| Frame formats and the wire-level state machine are documented in | |
| docs/CH9350L_PROTO.md on the feature/13-ch9350L-support branch. | |
| Requires pyserial. | |
| """ | |
| import time | |
| import threading | |
| import argparse | |
| import serial | |
| HEADER = b"\x57\xAB" | |
| # Captured HID Report Descriptors from a real CH9350L LC at attach time | |
| # (sniff_lc.txt, 2026-05-03). These are what the LC announces over 0x81 to | |
| # the UC so it can present matching HID descriptors to the target PC. | |
| DEFAULT_MOUSE_DESC = bytes.fromhex( | |
| "05010902a10185010901a1000509190129031500250175019503810275059501" | |
| "810105010930093109381581257f750895038106c005ff09021500250175019501" | |
| "b12275079501b101c0" | |
| ) | |
| DEFAULT_MOUSE_PID = bytes.fromhex("4000") # id2 / port-1 PID | |
| DEFAULT_KBD_DESC = bytes.fromhex( | |
| "05010906a1018501050719e029e7150025017501950881029501750881019503" | |
| "750105081901290391029505750191019506750826ff000507190029918100c0" | |
| "05010980a10185021981298315002501950375018102950175058101c0050c09" | |
| "01a10185031500250109e909ea09e209cd19b529b87501950881020a8a010a21" | |
| "020a2a021a23022a270281020a83010a96010a92010a9e010a94010a060209b2" | |
| "09b48102c0" | |
| ) | |
| DEFAULT_KBD_PID = bytes.fromhex("0315") # id2 / port-2 PID | |
| # --------------------------------------------------------------------------- | |
| # Protocol constants | |
| # --------------------------------------------------------------------------- | |
| # Known fixed payload lengths (bytes after the cmd byte). | |
| # 0x83/0x88 are length-prefixed (LEN byte at payload[0]) — handled separately. | |
| PAYLOAD_LEN = { | |
| 0x82: 1, # Heartbeat: one status byte (0xA0..0xAF) | |
| 0x12: 8, # Upper-computer init/LED frame: 8 bytes after cmd | |
| 0x01: 8, # Keyboard state-2: 8-byte USB HID boot report | |
| 0x02: 4, # Mouse relative state-2: btn, dx, dy, wheel | |
| 0x04: 7, # Mouse absolute state-3/4: id, btn, xl, xh, yl, yh, wheel | |
| 0x80: 1, # Startup: 1-byte status (observed at USB device attach) | |
| 0x86: 0, # Startup: attach signal (no payload) | |
| 0x89: 0, # Periodic LC status announce (no payload) | |
| } | |
| # LC → UC heartbeat. Low nibble = IO pin state; a real LC sends 0xA3 | |
| # (IO0/IO1 high). 0xA0 also accepted by the UC. | |
| HEARTBEAT_FRAME = bytes([0x57, 0xAB, 0x82, 0xA3]) | |
| # --------------------------------------------------------------------------- | |
| # HID usage ID names (index = usage ID) | |
| # --------------------------------------------------------------------------- | |
| _HID_USAGE = ( | |
| "NONE ERR_OVF POST_FAIL ERR_UNDEF " | |
| "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z " | |
| "1 2 3 4 5 6 7 8 9 0 " | |
| "ENTER ESC BSPC TAB SPACE - = [ ] \\ # ; ' ` , . / " | |
| "CAPS F1 F2 F3 F4 F5 F6 F7 F8 F9 F10 F11 F12 " | |
| "PRTSC SCRLK PAUSE INS HOME PGUP DEL END PGDN " | |
| "RIGHT LEFT DOWN UP " | |
| "NUMLK KP/ KP* KP- KP+ KPENT KP1 KP2 KP3 KP4 KP5 KP6 KP7 KP8 KP9 KP0 KP." | |
| ).split() | |
| MODS = { | |
| 0x01: "LCTRL", 0x02: "LSHIFT", 0x04: "LALT", 0x08: "LWIN", | |
| 0x10: "RCTRL", 0x20: "RSHIFT", 0x40: "RALT", 0x80: "RWIN", | |
| } | |
| def fmt_mod(b: int) -> str: | |
| return "+".join(n for m, n in MODS.items() if b & m) or "-" | |
| def hid_name(code: int) -> str: | |
| if code < len(_HID_USAGE): | |
| return _HID_USAGE[code] | |
| return f"0x{code:02X}" | |
| def fmt_hid(report: bytes) -> str: | |
| """Format an 8-byte USB HID boot keyboard report.""" | |
| if len(report) < 8: | |
| return f"<short {report.hex()}>" | |
| mod = report[0] | |
| keys = [hid_name(report[i]) for i in range(2, 8) if report[i]] | |
| return f"mod={fmt_mod(mod)} keys={keys}" | |
| # --------------------------------------------------------------------------- | |
| # Frame decoder | |
| # --------------------------------------------------------------------------- | |
| def decode_frame(cmd: int, payload: bytes) -> str: | |
| if cmd == 0x82: | |
| io = payload[0] & 0x0F if payload else "?" | |
| return f"HEARTBEAT io={io}" | |
| if cmd == 0x81: | |
| # 57 AB 81 [ID1:1] [LEN:2 LE] [PAYLOAD] [ID2:2] [CHK:1] | |
| if len(payload) >= 6: | |
| id1 = payload[0] | |
| plen = payload[1] | (payload[2] << 8) | |
| need = 3 + plen + 3 | |
| if len(payload) >= need: | |
| desc = payload[3:3 + plen] | |
| id2 = payload[3 + plen:5 + plen] | |
| chk = payload[5 + plen] | |
| expected = (sum(desc) + sum(id2)) & 0xFF | |
| ok = "OK" if chk == expected else f"ERR(exp {expected:02X})" | |
| return (f"DEVICE_CONNECT id1=0x{id1:02X} len={plen} " | |
| f"id2={id2.hex()} chk={chk:02X}{ok} desc={desc.hex()}") | |
| return f"DEVICE_CONNECT {payload.hex()}" | |
| if cmd in (0x83, 0x88): | |
| tag = "PAIRED" if cmd == 0x83 else "SOLO" | |
| if len(payload) < 4: | |
| return f"KEY_{tag} <short: {payload.hex()}>" | |
| plen = payload[0] | |
| if len(payload) < 1 + plen: | |
| return f"KEY_{tag} <incomplete len={plen} have={len(payload) - 1}: {payload.hex()}>" | |
| ser = payload[1] | |
| typ = payload[2] | |
| # data = bytes between SER and CTR (includes TYPE/MOD/BTN byte where present) | |
| data = payload[3 : 1 + plen - 2] | |
| ctr = payload[1 + plen - 2] | |
| ctr_sum = payload[1 + plen - 1] | |
| expected_sum = (ctr + sum(payload[2 : 1 + plen - 2])) & 0xFF | |
| chk_ok = "OK" if ctr_sum == expected_sum else f"ERR(exp {expected_sum:02X})" | |
| # CH9329 keyboard: LEN=0x0C, TYPE byte=0x01 (HID report ID), then 8-byte boot report | |
| if plen == 0x0C and typ == 0x01: | |
| return (f"KEY_{tag} ser=0x{ser:02X} KB {fmt_hid(data)}" | |
| f" ctr={ctr:02X} sum={ctr_sum:02X}{chk_ok}") | |
| # CH9329 absolute mouse: LEN=0x0A, TYPE byte=0x05 (HID report ID) | |
| if plen == 0x0A and typ == 0x05: | |
| if len(data) >= 6: | |
| btn, xl, xh, yl, yh, whl = data[:6] | |
| x, y = (xh << 8) | xl, (yh << 8) | yl | |
| return (f"KEY_{tag} ser=0x{ser:02X} MOUSE_ABS" | |
| f" btn=0x{btn:02X} x={x} y={y} wheel={whl}" | |
| f" ctr={ctr:02X} sum={ctr_sum:02X}{chk_ok}") | |
| # Boot keyboard (real keyboard, HID boot protocol, no report ID): LEN=0x0B | |
| # No TYPE byte: payload after SER is directly mod rsvd k0..k5. | |
| # 'typ' here is the HID modifier byte; data = [rsvd, k0..k5]. | |
| if plen == 0x0B: | |
| hid = bytes([typ]) + bytes(data) # reconstruct 8-byte boot report | |
| return (f"KEY_{tag} ser=0x{ser:02X} KB_BOOT {fmt_hid(hid)}" | |
| f" ctr={ctr:02X} sum={ctr_sum:02X}{chk_ok}") | |
| # Relative mouse with report ID (LEN=0x08, SER bits 5,4 = mouse): | |
| # TYPE=report-id (e.g. 0x01), then 4-byte boot mouse: btn dx dy wheel. | |
| if plen == 0x08 and len(data) >= 4: | |
| btn = data[0] | |
| dx_raw, dy_raw, whl = data[1], data[2], data[3] | |
| dx = dx_raw - 256 if dx_raw > 127 else dx_raw | |
| dy = dy_raw - 256 if dy_raw > 127 else dy_raw | |
| return (f"KEY_{tag} ser=0x{ser:02X} MOUSE_REL_RID" | |
| f" rid=0x{typ:02X} btn=0x{btn:02X} dx={dx:+d} dy={dy:+d} wheel={whl}" | |
| f" ctr={ctr:02X} sum={ctr_sum:02X}{chk_ok}") | |
| # Boot relative mouse (real mouse, HID boot protocol, no report ID): LEN=0x07 | |
| # No TYPE byte: payload after SER is buttons dx dy wheel. | |
| # 'typ' here is the HID buttons byte; data = [dx, dy, wheel]. | |
| if plen == 0x07 and len(data) >= 3: | |
| btn = typ | |
| dx_raw, dy_raw, whl = data[0], data[1], data[2] | |
| dx = dx_raw - 256 if dx_raw > 127 else dx_raw | |
| dy = dy_raw - 256 if dy_raw > 127 else dy_raw | |
| return (f"KEY_{tag} ser=0x{ser:02X} MOUSE_REL" | |
| f" btn=0x{btn:02X} dx={dx:+d} dy={dy:+d} wheel={whl}" | |
| f" ctr={ctr:02X} sum={ctr_sum:02X}{chk_ok}") | |
| return (f"KEY_{tag} ser=0x{ser:02X} typ=0x{typ:02X} data={data.hex()}" | |
| f" ctr={ctr:02X} sum={ctr_sum:02X}{chk_ok}") | |
| if cmd == 0x01: | |
| return f"KEYBOARD {fmt_hid(payload)}" | |
| if cmd == 0x02: | |
| if len(payload) >= 4: | |
| btn, dx, dy, whl = payload[0], payload[1], payload[2], payload[3] | |
| dx = dx - 256 if dx > 127 else dx | |
| dy = dy - 256 if dy > 127 else dy | |
| return f"MOUSE_REL btn=0x{btn:02X} dx={dx:+d} dy={dy:+d} wheel={whl}" | |
| return f"MOUSE_REL {payload.hex()}" | |
| if cmd == 0x04: | |
| if len(payload) >= 7: | |
| btn, xl, xh, yl, yh, whl = payload[1:7] # payload[0] = report ID | |
| x, y = (xh << 8) | xl, (yh << 8) | yl | |
| return f"MOUSE_ABS btn=0x{btn:02X} x={x} y={y} wheel={whl}" | |
| return f"MOUSE_ABS {payload.hex()}" | |
| if cmd == 0x12: | |
| # UC keep-alive (datasheet §4.10 + observed): 8 bytes payload. | |
| # [P1:2 LE] [P2:2 LE] [LED:1] [STATUS:1] [VERSION:2] | |
| # P1/P2 mirror the PID field of the LC's 0x81 frames once accepted. | |
| # LED bits: 0=NumLk 1=CapsLk 2=ScrLk; 0xFF observed when the target | |
| # host hasn't reported LED state yet (pre-enumeration default). | |
| # STATUS bits (empirical, see CH9350L_PROTO.md): | |
| # bit 0 = port-0 device enumerated on target USB | |
| # bit 1 = port-1 device enumerated on target USB | |
| # bit 2 = UART link healthy / UC alive | |
| # 0x07 = both devices live on target (HID forwarding works). | |
| # 0x04 = UART up but target hasn't enumerated devices yet. | |
| # 0xFF = transient/error during disconnect. | |
| if len(payload) >= 8: | |
| p1 = payload[0:2].hex() | |
| p2 = payload[2:4].hex() | |
| led = payload[4] | |
| status = payload[5] | |
| ver = payload[6:8].hex() | |
| led_flags = [n for b, n in ((0x01, "NumLk"), (0x02, "CapsLk"), | |
| (0x04, "ScrLk")) if led & b] | |
| st_flags = [n for b, n in ((0x01, "p0enum"), (0x02, "p1enum"), | |
| (0x04, "uart")) if status & b] | |
| return (f"UC_KEEPALIVE p1={p1} p2={p2} " | |
| f"led=0x{led:02X}[{','.join(led_flags) or '-'}] " | |
| f"status=0x{status:02X}[{','.join(st_flags) or '-'}] " | |
| f"ver={ver}") | |
| return f"UC_KEEPALIVE {payload.hex()}" | |
| if cmd == 0x87: | |
| return f"GET_VERSION {payload.hex()}" | |
| if cmd == 0x86: | |
| # Attach (followed by 0x80/0x89/0x81) or disconnect (bare). Same opcode. | |
| return "DEVICE_NOTIFY []" | |
| if cmd == 0x89: | |
| return "STATUS_ANNOUNCE []" | |
| if cmd == 0x80: | |
| val = f"0x{payload[0]:02X}" if payload else "?" | |
| return f"STARTUP_STATUS [{val}]" | |
| return f"0x{cmd:02X} [{payload.hex()}]" | |
| # --------------------------------------------------------------------------- | |
| # Sniff mode | |
| # --------------------------------------------------------------------------- | |
| def _ts() -> str: | |
| t = time.time() | |
| us = int((t - int(t)) * 1_000_000) | |
| return f"{time.strftime('%H:%M:%S', time.localtime(t))}.{us:06d}" | |
| def _parse_buf(buf: bytearray) -> tuple[list[tuple[int, bytes]], bytearray]: | |
| """ | |
| Extract complete frames from buf. | |
| Returns (frames, remaining_buf) where frames = [(cmd, payload), ...] | |
| """ | |
| frames = [] | |
| while True: | |
| idx = buf.find(HEADER) | |
| if idx == -1: | |
| # No header: keep the last byte in case it starts a header | |
| buf = bytearray(buf[-1:]) if buf else bytearray() | |
| break | |
| if idx > 0: | |
| # Pre-header bytes — emit as a synthetic (-1, skipped) entry so | |
| # the caller can log them; they're usually framing noise from a | |
| # mid-frame sniff start or a corrupted byte. | |
| skipped = bytes(buf[:idx]) | |
| buf = buf[idx:] | |
| frames.append((-1, skipped)) | |
| if len(buf) < 3: | |
| break # need at least header + cmd | |
| cmd = buf[2] | |
| if cmd in (0x83, 0x88): | |
| # Length-prefixed: payload[0] = LEN, total payload = 1 + LEN bytes | |
| if len(buf) < 4: | |
| break # need at least header + cmd + LEN | |
| plen = buf[3] | |
| needed = 4 + plen # header(2) + cmd(1) + LEN(1) + plen bytes | |
| if len(buf) < needed: | |
| break | |
| payload = bytes(buf[3:needed]) | |
| buf = buf[needed:] | |
| frames.append((cmd, payload)) | |
| continue | |
| if cmd == 0x81: | |
| # Device Connection: 57 AB 81 [ID1] [LEN_LO LEN_HI] [PAYLOAD] [ID2_LO ID2_HI] [CHK] | |
| if len(buf) < 6: | |
| break # need header + cmd + ID1 + 2-byte LEN | |
| plen = buf[4] | (buf[5] << 8) | |
| needed = 3 + 1 + 2 + plen + 2 + 1 # = 9 + plen | |
| if len(buf) < needed: | |
| break | |
| payload = bytes(buf[3:needed]) | |
| buf = buf[needed:] | |
| frames.append((cmd, payload)) | |
| continue | |
| known_len = PAYLOAD_LEN.get(cmd) | |
| if known_len is not None: | |
| needed = 3 + known_len | |
| if len(buf) < needed: | |
| break # wait for more data | |
| payload = bytes(buf[3:needed]) | |
| buf = buf[needed:] | |
| frames.append((cmd, payload)) | |
| else: | |
| # Unknown cmd: scan for next header to bound the frame | |
| next_hdr = buf.find(HEADER, 3) | |
| if next_hdr == -1: | |
| if len(buf) > 128: | |
| payload = bytes(buf[3:]) | |
| frames.append((cmd, payload)) | |
| buf = bytearray() | |
| else: | |
| break | |
| else: | |
| payload = bytes(buf[3:next_hdr]) | |
| frames.append((cmd, payload)) | |
| buf = buf[next_hdr:] | |
| return frames, buf | |
| def sniff(port_name: str, baud: int = 115200, logpath: str | None = None, | |
| tag: str | None = None): | |
| log = open(logpath, "w", encoding="utf-8") if logpath else None | |
| prefix = f"[{tag}] " if tag else "" | |
| def emit(msg: str): | |
| print(msg) | |
| if log: | |
| log.write(msg + "\n") | |
| log.flush() | |
| emit(f"[sniff{f' {tag}' if tag else ''}] Opening {port_name} @ {baud} 8N1" | |
| + (f" log={logpath}" if log else "") + "\n") | |
| try: | |
| with serial.Serial(port_name, baud, timeout=0.05) as ser: | |
| buf = bytearray() | |
| try: | |
| while True: | |
| chunk = ser.read(256) | |
| if chunk: | |
| buf.extend(chunk) | |
| frames, buf = _parse_buf(buf) | |
| for cmd, payload in frames: | |
| if cmd == -1: | |
| emit(f"{_ts()} {prefix}[sync: skipped {len(payload)}B: {payload.hex()}]") | |
| else: | |
| raw = HEADER + bytes([cmd]) + payload | |
| emit(f"{_ts()} {prefix}{raw.hex(' ')} | {decode_frame(cmd, payload)}") | |
| except KeyboardInterrupt: | |
| emit("\n[sniff] Stopped.") | |
| finally: | |
| if log: | |
| log.close() | |
| # --------------------------------------------------------------------------- | |
| # Emulate mode (lower-computer state machine) | |
| # --------------------------------------------------------------------------- | |
| class LowerComputer: | |
| """ | |
| CH9350L lower-computer state machine. | |
| Drives the attach sequence (0x86 → 0x80 ×2 → heartbeats → 0x89 → 0x81 | |
| descriptors), maintains the per-SER frame counters, and gates the | |
| state-0 → state-1 transition on UC acknowledgement of every announced | |
| PID. CMD = 0x88 in state 0, 0x83 in state 1 — payload is identical. | |
| State-2 mode (--state2) is an alternative, handshake-free dipswitch | |
| configuration on the UC side (S0=LOW, S1=HIGH); it sends fixed-length | |
| `57 AB 01 ...` keyboard / `57 AB 02 ...` mouse frames with no SER, no | |
| counter, and no descriptor announce. Documented from the datasheet | |
| only — not verified empirically. | |
| See docs/CH9350L_PROTO.md for full frame formats. | |
| """ | |
| STATE0 = 0 | |
| STATE1 = 1 | |
| # State-0/1 frame SER + report-ID bytes. Each SER must be consistent | |
| # with the HID Report Descriptor announced via 0x81 — RID values must | |
| # match Report ID items in the descriptor. | |
| # | |
| # Default (matches DEFAULT_*_DESC): keyboard report-ID=0x01 boot kbd, | |
| # mouse report-ID=0x01 relative. | |
| KB_SER = 0x13 # keyboard / HID / port 2 | |
| KB_RID = 0x01 # 8-byte boot keyboard report follows | |
| MOU_SER = 0x22 # mouse / HID / port 1 | |
| MOU_RID = 0x01 # 4-byte relative mouse report follows | |
| # Boot-protocol format (no report ID — used when the device's descriptor | |
| # contains no Report ID item; --boot-hid selects this path). | |
| KB_SER_BOOT = 0x11 | |
| MOU_SER_BOOT = 0x20 | |
| def __init__(self, port: serial.Serial, state2_mode: bool = False, | |
| boot_hid: bool = False, | |
| mouse_desc: bytes | None = None, mouse_pid: bytes | None = None, | |
| kbd_desc: bytes | None = None, kbd_pid: bytes | None = None): | |
| self.port = port | |
| self.state2_mode = state2_mode | |
| self.boot_hid = boot_hid | |
| self.mouse_desc = mouse_desc | |
| self.mouse_pid = mouse_pid or b"\x00\x00" | |
| self.kbd_desc = kbd_desc | |
| self.kbd_pid = kbd_pid or b"\x00\x00" | |
| self.state = self.STATE0 | |
| self._stop = threading.Event() | |
| self._last_hb = 0.0 | |
| self._hb_interval = 1.0 | |
| # Frame counters are per-SER (datasheet §4.3 / observed): keyboard | |
| # and mouse increment independently. | |
| self._kbd_counter = 0 | |
| self._mou_counter = 0 | |
| # Sync gates — set by _rx_loop when UC keep-alive is observed and | |
| # when each announced PID is reflected back. Used to defer 0x81 | |
| # send until UC is up, and to drive periodic 0x81 retransmit while | |
| # un-acked. | |
| self._uc_seen = threading.Event() | |
| self._uc_p1 = b"\x00\x00" | |
| self._uc_p2 = b"\x00\x00" | |
| self._uc_status = 0x00 # STATUS byte from latest 0x12 | |
| # Set when rx_loop detects PID-drop (UC re-attached to target); | |
| # the maintenance thread picks this up and replays the full | |
| # attach sequence (0x86 → 0x80 → 0x89 → 0x81). PID-only | |
| # retransmit is insufficient: the UC needs 0x86/0x80 to re- | |
| # present its USB-device side to the target host. | |
| self._reattach_needed = threading.Event() | |
| self._last_mouse_announce = 0.0 | |
| self._last_kbd_announce = 0.0 | |
| self._announce_retry_interval = 2.0 | |
| # Serialise port writes — REPL, rx_loop responses, and the | |
| # maintenance thread all call _send concurrently. | |
| self._tx_lock = threading.Lock() | |
| def _build_device_connect_frame(self, descriptor: bytes, | |
| port_id: int, device_pid: bytes) -> bytes: | |
| """ | |
| Build a 0x81 Device Connection Frame: | |
| 57 AB 81 [PORT:1] [LEN:2 LE] [DESCRIPTOR] [PID:2] [CHK:1] | |
| CHK = (sum(DESCRIPTOR) + sum(PID)) & 0xFF | |
| DESCRIPTOR is the USB HID Report Descriptor of the device the LC has | |
| enumerated. PID is a 2-byte identifier reflected back in the UC's | |
| 0x12 keep-alive once the descriptor has been processed. | |
| """ | |
| if len(device_pid) != 2: | |
| raise ValueError("device_pid must be 2 bytes") | |
| plen = len(descriptor) | |
| chk = (sum(descriptor) + sum(device_pid)) & 0xFF | |
| return (HEADER + bytes([0x81, port_id, plen & 0xFF, (plen >> 8) & 0xFF]) | |
| + descriptor + device_pid + bytes([chk])) | |
| def _send(self, data: bytes): | |
| with self._tx_lock: | |
| self.port.write(data) | |
| print(f"{_ts()} TX {data.hex(' ')}") | |
| def _heartbeat(self): | |
| self._send(HEARTBEAT_FRAME) | |
| self._last_hb = time.time() | |
| def _send_announce(self): | |
| """Send a 0x89 status announce. Real LC emits these ~3 times during | |
| attach (see CH9350L_PROTO.md §Status Announce); the emulator sends | |
| one as part of the attach sequence.""" | |
| self._send(HEADER + bytes([0x89])) | |
| def _announce_descriptors(self): | |
| """Send 0x81 Device Connection Frames for each configured device. | |
| Used both for the initial announce and for retransmits.""" | |
| now = time.time() | |
| if self.mouse_desc: | |
| self._send(self._build_device_connect_frame( | |
| self.mouse_desc, port_id=0x00, device_pid=self.mouse_pid)) | |
| self._last_mouse_announce = now | |
| time.sleep(0.05) | |
| if self.kbd_desc: | |
| self._send(self._build_device_connect_frame( | |
| self.kbd_desc, port_id=0x01, device_pid=self.kbd_pid)) | |
| self._last_kbd_announce = now | |
| time.sleep(0.05) | |
| def _maybe_retransmit_descriptors(self, now: float): | |
| """Retransmit any 0x81 whose PID isn't yet reflected in the UC's | |
| keep-alive. Matches real-LC behaviour (~2 s retry) and recovers | |
| from UC restart or transient UART loss mid-session.""" | |
| if self.mouse_desc and self._uc_p1 != self.mouse_pid: | |
| if now - self._last_mouse_announce >= self._announce_retry_interval: | |
| self._send(self._build_device_connect_frame( | |
| self.mouse_desc, port_id=0x00, device_pid=self.mouse_pid)) | |
| self._last_mouse_announce = now | |
| if self.kbd_desc and self._uc_p2 != self.kbd_pid: | |
| if now - self._last_kbd_announce >= self._announce_retry_interval: | |
| self._send(self._build_device_connect_frame( | |
| self.kbd_desc, port_id=0x01, device_pid=self.kbd_pid)) | |
| self._last_kbd_announce = now | |
| def _build_key_frame_state01(self, modifier: int, keycodes: list[int]) -> bytes: | |
| """ | |
| State-0/1 keyboard frame. | |
| Default (LEN=0x0C, SER=0x13): RID=0x01 prefix + 8-byte HID boot report. | |
| Boot format (--boot-hid, LEN=0x0B, SER=0x11): no RID prefix. | |
| """ | |
| cmd = 0x83 if self.state == self.STATE1 else 0x88 | |
| keys = (keycodes + [0] * 6)[:6] | |
| if self.boot_hid: | |
| hid = bytes([modifier, 0x00] + keys) | |
| ser = self.KB_SER_BOOT | |
| else: | |
| hid = bytes([self.KB_RID, modifier, 0x00] + keys) | |
| ser = self.KB_SER | |
| ctr = self._kbd_counter & 0xFF | |
| self._kbd_counter += 1 | |
| ctr_sum = (ctr + sum(hid)) & 0xFF | |
| plen = 1 + len(hid) + 2 # SER + hid + CTR + CTR_SUM | |
| payload = bytes([plen, ser]) + hid + bytes([ctr, ctr_sum]) | |
| return HEADER + bytes([cmd]) + payload | |
| def _build_key_frame_state2(self, modifier: int, keycodes: list[int]) -> bytes: | |
| """State-2 keyboard frame: 57 AB 01 [8-byte HID boot report]""" | |
| keys = (keycodes + [0] * 6)[:6] | |
| report = bytes([modifier, 0x00] + keys) | |
| return HEADER + bytes([0x01]) + report | |
| def _build_mouse_frame_state01(self, btn: int, dx: int, dy: int, wheel: int) -> bytes: | |
| """ | |
| State-0/1 relative mouse frame. | |
| Default (LEN=0x08, SER=0x22): RID=0x01 prefix + 4-byte boot mouse | |
| [btn dx dy wheel]. Matches DEFAULT_MOUSE_DESC. | |
| Boot format (--boot-hid, LEN=0x07, SER=0x20): no RID prefix. | |
| dx/dy are signed deltas clamped to ±127. | |
| """ | |
| cmd = 0x83 if self.state == self.STATE1 else 0x88 | |
| dx = max(-127, min(127, dx)) & 0xFF | |
| dy = max(-127, min(127, dy)) & 0xFF | |
| if self.boot_hid: | |
| data = bytes([btn, dx, dy, wheel & 0xFF]) | |
| ser = self.MOU_SER_BOOT | |
| else: | |
| data = bytes([self.MOU_RID, btn, dx, dy, wheel & 0xFF]) | |
| ser = self.MOU_SER | |
| ctr = self._mou_counter & 0xFF | |
| self._mou_counter += 1 | |
| ctr_sum = (ctr + sum(data)) & 0xFF | |
| plen = 1 + len(data) + 2 # SER + data + CTR + CTR_SUM | |
| payload = bytes([plen, ser]) + data + bytes([ctr, ctr_sum]) | |
| return HEADER + bytes([cmd]) + payload | |
| def _build_mouse_frame_state2(self, btn: int, dx: int, dy: int, wheel: int) -> bytes: | |
| """State-2 relative mouse frame: 57 AB 02 btn dx dy wheel""" | |
| dx = max(-127, min(127, dx)) & 0xFF | |
| dy = max(-127, min(127, dy)) & 0xFF | |
| return HEADER + bytes([0x02, btn & 0xFF, dx, dy, wheel & 0xFF]) | |
| def send_key(self, modifier: int, keycodes: list[int]): | |
| if self.state2_mode: | |
| frame = self._build_key_frame_state2(modifier, keycodes) | |
| else: | |
| frame = self._build_key_frame_state01(modifier, keycodes) | |
| self._send(frame) | |
| def send_key_release(self): | |
| self.send_key(0, []) | |
| def send_mouse(self, btn: int, dx: int, dy: int, wheel: int = 0): | |
| """Send a relative mouse frame. dx/dy are signed 8-bit deltas.""" | |
| if self.state2_mode: | |
| frame = self._build_mouse_frame_state2(btn, dx, dy, wheel) | |
| else: | |
| frame = self._build_mouse_frame_state01(btn, dx, dy, wheel) | |
| self._send(frame) | |
| def _run_attach_sequence(self, *, wait_for_uc: bool): | |
| """Replay the LC→UC attach sequence: | |
| 0x86 → 0x80 0xFF (×2) → heartbeat → 0x89 → 0x81 ×N | |
| Used both at startup (wait_for_uc=True: defer 0x81 until we | |
| observe a UC keep-alive) and on reconnect (wait_for_uc=False: | |
| UC is already keep-aliving, but its USB device side dropped | |
| and needs 0x86/0x80 to re-present to the target host). | |
| """ | |
| self._send(HEADER + bytes([0x86])) | |
| time.sleep(0.25) | |
| self._send(HEADER + bytes([0x80, 0xFF])) | |
| time.sleep(0.23) | |
| self._send(HEADER + bytes([0x80, 0xFF])) | |
| self._heartbeat() | |
| time.sleep(1.0) | |
| self._send_announce() | |
| if not (self.mouse_desc or self.kbd_desc): | |
| return | |
| if wait_for_uc and not self._uc_seen.is_set(): | |
| print("[emulate] Waiting for UC keep-alive before announcing devices...") | |
| while not self._uc_seen.wait(timeout=self._hb_interval): | |
| self._heartbeat() | |
| self._announce_descriptors() | |
| def _tx_maint_loop(self): | |
| """Background thread: heartbeat, 0x81 retransmit, and full | |
| attach replay on reconnect. Decoupled from the REPL so a blocked | |
| input() prompt doesn't starve the UC of heartbeats or stall | |
| reconnect handling.""" | |
| while not self._stop.is_set(): | |
| if self._reattach_needed.is_set(): | |
| self._reattach_needed.clear() | |
| print(f"{_ts()} ** Reattach: replaying attach sequence **") | |
| self._run_attach_sequence(wait_for_uc=False) | |
| continue | |
| now = time.time() | |
| if now - self._last_hb >= self._hb_interval: | |
| self._heartbeat() | |
| self._maybe_retransmit_descriptors(now) | |
| self._stop.wait(timeout=0.2) | |
| def _rx_loop(self): | |
| """Background thread: read + decode incoming frames from upper computer.""" | |
| buf = bytearray() | |
| while not self._stop.is_set(): | |
| try: | |
| chunk = self.port.read(64) | |
| except serial.SerialException: | |
| break | |
| if chunk: | |
| buf.extend(chunk) | |
| frames, buf = _parse_buf(buf) | |
| for cmd, payload in frames: | |
| if cmd == -1: | |
| print(f"{_ts()} RX [sync skip {len(payload)}B]") | |
| continue | |
| print(f"{_ts()} RX {(HEADER + bytes([cmd]) + payload).hex(' ')} | {decode_frame(cmd, payload)}") | |
| if cmd == 0x12 and len(payload) >= 4: | |
| # UC keep-alive observed → cleared to send 0x81 | |
| # frames. Track current PIDs so the main loop can | |
| # drive retransmits and the state-1 gate. | |
| self._uc_p1 = bytes(payload[0:2]) | |
| self._uc_p2 = bytes(payload[2:4]) | |
| if len(payload) >= 6: | |
| self._uc_status = payload[5] | |
| if not self._uc_seen.is_set(): | |
| print(f"{_ts()} ** UC keep-alive observed — " | |
| f"announce phase cleared **") | |
| self._uc_seen.set() | |
| want_p1 = self.mouse_pid if self.mouse_desc else b"\x00\x00" | |
| want_p2 = self.kbd_pid if self.kbd_desc else b"\x00\x00" | |
| any_announced = self.mouse_desc or self.kbd_desc | |
| all_acked = (self._uc_p1 == want_p1 and self._uc_p2 == want_p2) | |
| if (self.state == self.STATE0 and any_announced | |
| and all_acked): | |
| print(f"{_ts()} ** UC acknowledged all PIDs — " | |
| f"entering state 1 **") | |
| self.state = self.STATE1 | |
| elif self.state == self.STATE1 and not all_acked: | |
| # UC dropped a PID — likely a USB-side | |
| # disconnect/reconnect on the target. Revert | |
| # to state 0 and trigger the maintenance | |
| # thread to replay the full attach sequence | |
| # (0x81 alone won't make the UC re-enumerate | |
| # to the target — STATUS stays at 0x04). | |
| print(f"{_ts()} ** UC PIDs no longer match " | |
| f"(p1={self._uc_p1.hex()} p2={self._uc_p2.hex()}) " | |
| f"— reverting to state 0, triggering reattach **") | |
| self.state = self.STATE0 | |
| self._reattach_needed.set() | |
| def run_interactive(self): | |
| """ | |
| Run the attach sequence and start an interactive REPL on stdin. | |
| Commands: | |
| <hex>[,<hex>...] press one or more keys by HID usage ID | |
| SHIFT+<hex> with left-shift modifier (also CTRL+, ALT+) | |
| m DX DY [B] [W] relative mouse delta, optional button + wheel | |
| r release all keys | |
| q quit | |
| """ | |
| rx = threading.Thread(target=self._rx_loop, daemon=True) | |
| rx.start() | |
| # Initial attach: 0x86 → 0x80 ×2 → 0x89 → wait for UC keep-alive | |
| # → 0x81 descriptors. The 0x81 frames carry HID Report Descriptors | |
| # for each connected device; without them the UC enumerates a | |
| # default device whose report IDs don't match the 0x83/0x88 key | |
| # frames and input is silently dropped on the target. | |
| self._run_attach_sequence(wait_for_uc=True) | |
| if not (self.mouse_desc or self.kbd_desc): | |
| print("[emulate] No 0x81 descriptors configured — UC will not " | |
| "enumerate matching HID devices. Use --mouse-desc / " | |
| "--kbd-desc to supply captured payloads, or rely on the " | |
| "built-in defaults.") | |
| self._heartbeat() | |
| # Hand heartbeat + retransmit off to a daemon thread so a blocked | |
| # input() prompt doesn't starve the UC of 0x82 (which causes it to | |
| # stop sending 0x12 keep-alives), and so reconnect retransmits | |
| # don't have to wait for the user to press Enter. | |
| threading.Thread(target=self._tx_maint_loop, daemon=True).start() | |
| print("\n[emulate] Ready. Type 'q' Enter to quit, see docstring for commands.\n") | |
| try: | |
| while True: | |
| line = input("> ").strip() | |
| if not line: | |
| continue | |
| if line.lower() == "q": | |
| break | |
| if line.lower() == "r": | |
| self.send_key_release() | |
| continue | |
| # Mouse command: m DX DY [btn=0] [wheel=0] | |
| if line.lower().startswith("m "): | |
| parts = line.split() | |
| try: | |
| dx = int(parts[1]) | |
| dy = int(parts[2]) | |
| btn = int(parts[3]) if len(parts) > 3 else 0 | |
| wheel = int(parts[4]) if len(parts) > 4 else 0 | |
| self.send_mouse(btn, dx, dy, wheel) | |
| except (IndexError, ValueError): | |
| print(" Usage: m DX DY [btn=0] [wheel=0]") | |
| continue | |
| # Keyboard command | |
| lower = line.lower() | |
| modifier = 0 | |
| if lower.startswith("shift+"): | |
| modifier = 0x02 | |
| line = line[6:] | |
| elif lower.startswith("ctrl+"): | |
| modifier = 0x01 | |
| line = line[5:] | |
| elif lower.startswith("alt+"): | |
| modifier = 0x04 | |
| line = line[4:] | |
| try: | |
| keycodes = [int(c.strip(), 16) for c in line.split(",")] | |
| self.send_key(modifier, keycodes) | |
| time.sleep(0.05) | |
| self.send_key_release() | |
| except ValueError: | |
| print(f" Bad input — key: hex usage IDs e.g. 04,05 | mouse: m X Y [btn] [wheel]") | |
| except (KeyboardInterrupt, EOFError): | |
| pass | |
| finally: | |
| self._stop.set() | |
| print("\n[emulate] Stopped.") | |
| def emulate(port_name: str, baud: int = 115200, state2: bool = False, | |
| boot_hid: bool = False, | |
| mouse_desc: bytes | None = None, mouse_pid: bytes | None = None, | |
| kbd_desc: bytes | None = None, kbd_pid: bytes | None = None): | |
| parts = ["state-2 (no pairing)" if state2 else "state-0/1 (heartbeat + pairing)"] | |
| if boot_hid: | |
| parts.append("boot-HID (SER=0x11/0x20, no TYPE byte)") | |
| if mouse_desc: | |
| parts.append(f"mouse-desc={len(mouse_desc)}B pid={mouse_pid.hex() if mouse_pid else '0000'}") | |
| if kbd_desc: | |
| parts.append(f"kbd-desc={len(kbd_desc)}B pid={kbd_pid.hex() if kbd_pid else '0000'}") | |
| print(f"[emulate] Opening {port_name} @ {baud} 8N1 mode={', '.join(parts)}") | |
| with serial.Serial(port_name, baud, timeout=0.05) as ser: | |
| lc = LowerComputer(ser, state2_mode=state2, boot_hid=boot_hid, | |
| mouse_desc=mouse_desc, mouse_pid=mouse_pid, | |
| kbd_desc=kbd_desc, kbd_pid=kbd_pid) | |
| lc.run_interactive() | |
| def _hex_arg(s: str | None) -> bytes | None: | |
| return bytes.fromhex(s.replace(" ", "")) if s else None | |
| # --------------------------------------------------------------------------- | |
| # Entry point | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| ap = argparse.ArgumentParser(description="CH9350L protocol PoC (issue #13)") | |
| sub = ap.add_subparsers(dest="cmd", required=True) | |
| sp = sub.add_parser("sniff", help="Decode incoming CH9350L frames") | |
| sp.add_argument("port", help="Serial port (e.g. COM4)") | |
| sp.add_argument("--baud", type=int, default=115200) | |
| sp.add_argument("--log", metavar="FILE", help="Also write output to FILE (UTF-8)") | |
| sp.add_argument("--tag", metavar="LABEL", | |
| help="Prefix every line with [LABEL] (e.g. LC or UC) for two-port captures") | |
| ep = sub.add_parser("emulate", help="Lower-computer state machine") | |
| ep.add_argument("port", help="Serial port connected to upper computer") | |
| ep.add_argument("--baud", type=int, default=115200) | |
| ep.add_argument("--state2", action="store_true", | |
| help="Use state-2 mode (simpler fixed frames, requires upper computer S0=LOW)") | |
| ep.add_argument("--boot-hid", action="store_true", | |
| help="Use boot-protocol HID frame format (SER=0x11/0x20, no TYPE byte)") | |
| ep.add_argument("--mouse-desc", metavar="HEX", | |
| help="HID Report Descriptor for mouse 0x81 frame (default: captured)") | |
| ep.add_argument("--mouse-pid", metavar="HEX", default="4000", | |
| help="2-byte port-1 PID for mouse 0x81 (default: 4000)") | |
| ep.add_argument("--kbd-desc", metavar="HEX", | |
| help="HID Report Descriptor for keyboard 0x81 frame (default: captured)") | |
| ep.add_argument("--kbd-pid", metavar="HEX", default="0315", | |
| help="2-byte port-2 PID for keyboard 0x81 (default: 0315)") | |
| ep.add_argument("--no-default-desc", action="store_true", | |
| help="Do not send any 0x81 frames unless explicitly supplied") | |
| args = ap.parse_args() | |
| if args.cmd == "sniff": | |
| sniff(args.port, args.baud, args.log, args.tag) | |
| return | |
| # State-2 mode bypasses the descriptor-announce protocol entirely; | |
| # combining it with --mouse-desc / --kbd-desc / --no-default-desc is | |
| # incoherent. | |
| if args.state2 and (args.mouse_desc or args.kbd_desc or args.no_default_desc): | |
| ap.error("--state2 cannot be combined with descriptor flags " | |
| "(--mouse-desc, --kbd-desc, --no-default-desc)") | |
| if args.state2 or args.no_default_desc: | |
| mouse_desc = _hex_arg(args.mouse_desc) | |
| kbd_desc = _hex_arg(args.kbd_desc) | |
| else: | |
| mouse_desc = _hex_arg(args.mouse_desc) or DEFAULT_MOUSE_DESC | |
| kbd_desc = _hex_arg(args.kbd_desc) or DEFAULT_KBD_DESC | |
| emulate(args.port, args.baud, args.state2, args.boot_hid, | |
| mouse_desc=mouse_desc, mouse_pid=_hex_arg(args.mouse_pid), | |
| kbd_desc=kbd_desc, kbd_pid=_hex_arg(args.kbd_pid)) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment