Created
April 30, 2020 19:09
-
-
Save AstraLuma/638d130119a7ee28f1050d396d0bd339 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Launches and manages uMTP-Responder with better cleanup. | |
""" | |
import contextlib | |
import os | |
from pathlib import Path | |
import subprocess | |
import tempfile | |
import time | |
@contextlib.contextmanager | |
def mount(src, dst, *params): | |
""" | |
Mounts and unmounts a thing | |
""" | |
print(f"mount {dst}") | |
subprocess.run(['mount', src, dst, *params], check=True) | |
try: | |
yield Path(dst) | |
finally: | |
print(f"umount {dst}") | |
subprocess.run(['umount', dst], check=True) | |
# NOTE: these operations are specifically tailored for working under ConfigFS, | |
# because it alters some of the fine details that affect algorithms for cleaning | |
# up | |
@contextlib.contextmanager | |
def mkdir(dirpath, **params): | |
""" | |
Makes and cleans up a directory. | |
""" | |
print(f"mkdir {dirpath}") | |
dirpath.mkdir(**params) | |
try: | |
yield dirpath | |
finally: | |
print(f"rmdir {dirpath}") | |
dirpath.rmdir() | |
@contextlib.contextmanager | |
def symlink(src, dst): | |
""" | |
Makes (and cleans up) a symlink at dst pointing to src | |
""" | |
print(f"ln {dst} {src}") | |
os.symlink(src, dst) | |
try: | |
yield dst | |
finally: | |
print(f"unlink {dst}") | |
dst.unlink() | |
@contextlib.contextmanager | |
def popen(cmd, **params): | |
daemon = subprocess.Popen(cmd, **params) | |
try: | |
yield daemon | |
finally: | |
daemon.terminate() | |
subprocess.run(['modprobe', 'libcomposite'], check=True) | |
with contextlib.ExitStack() as context_stack: | |
ctx = context_stack.enter_context | |
working = Path(ctx(tempfile.TemporaryDirectory())) | |
cfg = ctx(mkdir(working / 'cfg')) | |
# NOTE: on raspbian this is already mounted at /sys/kernel/config | |
# but idk how common this is | |
ctx(mount('none', cfg, '-t', 'configfs')) | |
# Has usb_gadget | |
g1 = ctx(mkdir(cfg / 'usb_gadget' / 'g1')) | |
# Pre-populated with: | |
# * ./g1/os_desc | |
# * ./g1/os_desc/qw_sign | |
# * ./g1/os_desc/b_vendor_code | |
# * ./g1/os_desc/use | |
# * ./g1/strings | |
# * ./g1/configs | |
# * ./g1/functions | |
# * ./g1/UDC | |
# * ./g1/bcdUSB | |
# * ./g1/bcdDevice | |
# * ./g1/idProduct | |
# * ./g1/idVendor | |
# * ./g1/bMaxPacketSize0 | |
# * ./g1/bDeviceProtocol | |
# * ./g1/bDeviceSubClass | |
# * ./g1/bDeviceClass | |
c1 = ctx(mkdir(g1 / 'configs' / 'c.1')) | |
# Creates: | |
# * configs/c.1/strings | |
# * configs/c.1/bmAttributes | |
# * configs/c.1/MaxPower | |
ctx(mkdir(g1 / 'functions' / 'ffs.mtp')) | |
# Creates nothing additional | |
strings = ctx(mkdir(g1 / 'strings' / '0x409')) | |
# Creates: | |
# * strings/0x409/serialnumber | |
# * strings/0x409/product | |
# * strings/0x409/manufacturer | |
confstrs = ctx(mkdir(c1 / 'strings' / '0x409')) | |
# TODO | |
(g1 / 'idProduct').write_text('0x0100') | |
(g1 / 'idVendor').write_text('0x1D6B') | |
(strings / 'serialnumber').write_text("01234567") | |
(strings / 'manufacturer').write_text("PursuedPyBear") | |
(strings / 'product').write_text("GameBear") | |
(confstrs / 'configuration').write_text('Conf 1\n') | |
(c1 / 'MaxPower').write_text('120') | |
ctx(symlink( | |
g1 / 'functions' / 'ffs.mtp', | |
c1 / 'ffs.mtp', | |
)) | |
ffs = ctx(mkdir(Path('/dev/ffs-mtp'))) | |
ctx(mount('mtp', ffs, '-t', 'functionfs')) | |
print("start umtprd") | |
daemon = ctx(popen(['umtprd'], cwd=g1)) | |
time.sleep(1) | |
print("# build udc") | |
(g1 / 'UDC').write_text('\n'.join( | |
item.name | |
for item in Path('/sys/class/udc').iterdir() | |
)) | |
retcode = daemon.wait() | |
if retcode: | |
raise Exception(f"mtprd failed with retcode={retcode}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment