Skip to content

Instantly share code, notes, and snippets.

@ArcaneNibble
Last active March 30, 2026 06:21
Show Gist options
  • Select an option

  • Save ArcaneNibble/bbe0f92ba01b1f9b9284937f16946bf2 to your computer and use it in GitHub Desktop.

Select an option

Save ArcaneNibble/bbe0f92ba01b1f9b9284937f16946bf2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from collections import namedtuple
import socket
import struct
usbip_user_op_common = namedtuple('usbip_user_op_common', 'version code status')
usbip_usb_device = namedtuple('usbip_usb_device', 'path busid busnum devnum speed idVendor idProduct bcdDevice bDeviceClass bDeviceSubClass bDeviceProtocol bConfigurationValue bNumConfigurations bNumInterfaces')
usbip_header_basic = namedtuple('usbip_header_basic', 'command seqnum devid direction ep')
usbip_header_cmd_submit = namedtuple('usbip_header_cmd_submit', 'transfer_flags transfer_buffer_length start_frame number_of_packets interval setup')
usbip_header_ret_submit = namedtuple('usbip_header_ret_submit', 'status actual_length start_frame number_of_packets error_count')
usbip_iso_packet_descriptor = namedtuple('usbip_iso_packet_descriptor', ['offset', 'length', 'actual_length', 'status'])
setup_packet = namedtuple('setup_packet', 'bmRequestType bRequest wValue wIndex wLength')
EPIPE = 32
VERSION = 0x111
def recv_or_panic(conn, len_):
ret = conn.recv(len_)
if len(ret) != len_:
print(ret)
raise Exception("failed to read")
return ret
def main():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("", 3240))
s.listen()
conn, addr = s.accept()
print("Connected to {}".format(addr))
active_configuration = 0
intf_alt_setting = 0
stash_data = 12345
is_kernel_mode = False
while True:
if not is_kernel_mode:
op_common_bytes = recv_or_panic(conn, 8)
op_common = usbip_user_op_common._make(struct.unpack(">HHI", op_common_bytes))
print(op_common)
if op_common.version != VERSION:
raise Exception("invalid version")
if op_common.code == 0x8003:
# OP_REQ_IMPORT
import_busid = recv_or_panic(conn, 32)
print("importing busid {}".format(import_busid))
reply_op_common = usbip_user_op_common(VERSION, 0x03, 0)
# print(reply_op_common)
reply_op_common_bytes = struct.pack(">HHI", *reply_op_common)
# print(reply_op_common_bytes)
reply_usbd = usbip_usb_device(
b'',
import_busid,
0, 0, 3, # high speed
0,
0,
0,
0,
0,
0,
0,
0,
0)
# print(reply_usbd)
reply_usbd_bytes = struct.pack(">256s32sIIIHHHBBBBBB", *reply_usbd)
# print(len(reply_usbd_bytes), reply_usbd_bytes)
conn.send(reply_op_common_bytes + reply_usbd_bytes)
is_kernel_mode = True
print("Connecting to kernel mode!")
else:
raise Exception("unsupported code")
else:
header_bytes = recv_or_panic(conn, 48)
header_basic_bytes = header_bytes[:20]
header_basic = usbip_header_basic._make(struct.unpack(">IIIII", header_basic_bytes))
# print(header_basic)
if header_basic.command == 1:
# USBIP_CMD_SUBMIT
header_cmd_submit_bytes = header_bytes[20:48]
header_cmd_submit = usbip_header_cmd_submit._make(struct.unpack(">Iiiii8s", header_cmd_submit_bytes))
print(header_cmd_submit)
# assert False
##### DO STUFF HERE
reply_data = None
isoc_reply_descriptors = None
if header_basic.direction == 1:
# IN
if header_basic.ep == 0:
# control
setup_pkt = setup_packet._make(struct.unpack("<BBHHH", header_cmd_submit.setup))
print(setup_pkt)
if setup_pkt.bRequest == 0:
if setup_pkt.bmRequestType & 0x7f == 0:
print("GET_STATUS (device)")
reply_data = b'\x00\x00'
elif setup_pkt.bmRequestType & 0x7f == 1:
print("GET_STATUS (interface)")
reply_data = b'\x00\x00'
elif setup_pkt.bRequest == 6:
# GET_DESCRIPTOR
desc_type = setup_pkt.wValue >> 8
desc_index = setup_pkt.wValue & 0xFF
print("GET_DESCRIPTOR {} {}".format(desc_type, desc_index))
if desc_type == 1:
# Device descriptor
reply_data = struct.pack("<BBHBBBBHHHBBBB",
18,
1,
0x0210,
0,
0,
0,
64,
0xF055,
0x0000,
0,
0,
0,
0,
1)
elif desc_type == 2:
# Configuration descriptor
reply_data = (struct.pack("<BBHBBBBB",
9,
2,
9 + 9 + (9 + 0xc + 9) + 9 + 9 + (7 + 0xb) + 7 + 7,
2,
1,
0,
0b10000000,
250) +
# Interface 0 descriptor (no EPs)
struct.pack("<BBBBBBBBB",
9,
4,
0,
0,
0,
1,
1,
0,
0) +
# AC Interface
struct.pack("<BBBHHBB",
9,
0x24,
1,
0x0100,
0x1e,
1,
1) +
# Input terminal
struct.pack("<BBBBHBBHBB",
0xc,
0x24,
2,
1,
0x0201,
0,
1,
0,
0,
0) +
# Output terminal
struct.pack("<BBBBHBBB",
9,
0x24,
3,
2,
0x0101,
0,
1,
0) +
# Interface 1 descriptor (no EPs)
struct.pack("<BBBBBBBBB",
9,
4,
1,
0,
0,
1,
2,
0,
0) +
# Interface descriptor (1 isoc EP)
struct.pack("<BBBBBBBBB",
9,
4,
1,
1,
1,
1,
2,
0,
0) +
# AS Interface
struct.pack("<BBBBBH",
7,
0x24,
1,
2,
1,
1) +
# Type I Format
struct.pack("<BBBBBBBBBBB",
0xb,
0x24,
2,
1,
1,
2,
16,
1,
0x40, 0x1f, 0x00) +
# EP1 IN
struct.pack("<BBBBHB",
7,
5,
0x81,
1,
16,
1) +
# Audio EP
struct.pack("<BBBBBH",
0x7,
0x25,
1,
0,
0,
0))
elif setup_pkt.bRequest == 8:
print("GET CONFIGURATION")
reply_data = bytes([active_configuration])
elif setup_pkt.bRequest == 10:
iface = setup_pkt.wIndex & 0xff
print(f"GET INTERFACE {iface}")
if iface == 0:
reply_data = b'\x00'
elif iface == 1:
reply_data = bytes([intf_alt_setting])
elif setup_pkt.bRequest == 0xAA:
print("SPECIAL TEST REQUEST IN")
reply_data = struct.pack("<I", stash_data)
elif header_basic.ep == 1:
# 0x81 IN iso
iso_packet_descs = []
for _i in range(header_cmd_submit.number_of_packets):
iso_pkt_desc = usbip_iso_packet_descriptor._make(struct.unpack(">IIII", recv_or_panic(conn, 16)))
iso_packet_descs.append(iso_pkt_desc)
print(iso_packet_descs)
# HAX SPECIAL SENDING
print("HAX HAX HAX HAX HAX")
reply_header_basic = usbip_header_basic(3, header_basic.seqnum, 0, 0, 0)
reply_header_submit = usbip_header_ret_submit(0, 2, 0, 2, 0)
reply_header_bytes = struct.pack(">IIIII", *reply_header_basic) + struct.pack(">iiiii", *reply_header_submit) + b'\x00\x00\x00\x00\x00\x00\x00\x00'
conn.send(reply_header_bytes)
conn.send(bytes([0xaa, 0xbb]))
iso_pkts = b''
iso_pkts += struct.pack(">IIII", *usbip_iso_packet_descriptor(0xcafebabe, 0xdeadbeef, 1, 0))
iso_pkts += struct.pack(">IIII", *usbip_iso_packet_descriptor(0xfeedface, 0xdeadbeef, 1, 0))
conn.send(iso_pkts)
continue
# reply_data = b''
# isoc_reply_descriptors = []
# for i in range(len(iso_packet_descs)):
# offs = len(reply_data)
# reply_data += bytes([(i + 1) & 0xff]) * iso_packet_descs[i].length
# isoc_reply_descriptors.append(usbip_iso_packet_descriptor(offs, iso_packet_descs[i].length, iso_packet_descs[i].length, 0))
# # print(reply_data)
# print(isoc_reply_descriptors)
else:
# OUT
request_data = recv_or_panic(conn, header_cmd_submit.transfer_buffer_length)
print(request_data)
if header_basic.ep == 0:
# control
setup_pkt = setup_packet._make(struct.unpack("<BBHHH", header_cmd_submit.setup))
print(setup_pkt)
if setup_pkt.bRequest == 9:
print("SET_CONFIGURATION {}".format(setup_pkt.wValue))
if setup_pkt.wValue in [0, 1]:
reply_data = b''
active_configuration = setup_pkt.wValue
intf_alt_setting = 0
elif setup_pkt.bRequest == 11:
iface = setup_pkt.wIndex & 0xff
print("SET_INTERFACE {} = {}".format(iface, setup_pkt.wValue))
if iface == 1 and setup_pkt.wValue in [0, 1]:
reply_data = b''
intf_alt_setting = setup_pkt.wValue
elif setup_pkt.bRequest == 0xAB:
print("SPECIAL TEST REQUEST OUT")
if len(request_data) < 4:
print("ERROR: TOO SHORT")
else:
stash_data = struct.unpack("<I", request_data[:4])[0]
reply_data = b''
##### SEND REPLY
reply_header_basic = usbip_header_basic(3, header_basic.seqnum, 0, 0, 0)
if reply_data is None:
reply_header_submit = usbip_header_ret_submit(-EPIPE, 0, 0, 0, 0)
elif isoc_reply_descriptors is None:
if len(reply_data) > header_cmd_submit.transfer_buffer_length:
reply_data = reply_data[:header_cmd_submit.transfer_buffer_length]
reply_header_submit = usbip_header_ret_submit(0, len(reply_data), 0, 0, 0)
else:
reply_header_submit = usbip_header_ret_submit(0, len(reply_data), 0, len(isoc_reply_descriptors), 0)
reply_header_bytes = struct.pack(">IIIII", *reply_header_basic) + struct.pack(">iiiii", *reply_header_submit) + b'\x00\x00\x00\x00\x00\x00\x00\x00'
conn.send(reply_header_bytes)
if reply_data is not None:
conn.send(reply_data)
if isoc_reply_descriptors is not None:
isoc_bytes = b''
for isoc_desc in isoc_reply_descriptors:
isoc_bytes += struct.pack(">IIII", *isoc_desc)
conn.send(isoc_bytes)
elif header_basic.command == 2:
# USBIP_CMD_UNLINK
seqnum_byts = header_bytes[20:24]
seqnum = struct.unpack(">I", seqnum_byts)[0]
print(seqnum)
# TODO?
else:
raise Exception("unsupported command")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment