Created
May 3, 2022 13:36
-
-
Save mentha/c368bb4d1c1a969765775ba439f3239e to your computer and use it in GitHub Desktop.
assign a usb port to libvirt guest
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from argparse import ArgumentParser | |
from traceback import print_exc | |
import libvirt | |
import os | |
import re | |
import subprocess as sp | |
class PortForward: | |
USBSYS = '/sys/bus/usb/devices' | |
def parse_args(self): | |
a = ArgumentParser(description='attach usb port to guests') | |
a.add_argument('guest', help='guest name') | |
a.add_argument('port', nargs='+', help='usb port in format of "BUS-PORT[.PORT]*"') | |
a = a.parse_args() | |
self.guest_name = a.guest | |
self.ports = set(a.port) | |
def main(self): | |
self.parse_args() | |
self.conn = libvirt.open() | |
self.guest = self.conn.lookupByName(self.guest_name) | |
mon = sp.Popen(['udevadm', 'monitor', '-s', 'usb', '-k'], stdin=sp.DEVNULL, stdout=sp.PIPE, universal_newlines=True) | |
re_devm = re.compile(r'\s+(add|remove)\s+\S+/(\d+-\d+(\.\d+)*)\s') | |
try: | |
while True: | |
l = mon.stdout.readline() | |
if not l: | |
break | |
m = re_devm.search(l) | |
if m is None: | |
continue | |
print(l.strip()) | |
self.devchange(m[1], m[2]) | |
finally: | |
mon.terminate() | |
def readall(self, *a, **ka): | |
with open(*a, **ka) as f: | |
return f.read() | |
def usbprop(self, dev, prop, *a, **ka): | |
return self.readall(os.path.join(self.USBSYS, dev, prop), *a, **ka).strip() | |
def devchange(self, action, path): | |
try: | |
if int(self.usbprop(path, 'bDeviceClass'), 16) == 0x09: | |
return # is a hub | |
match = False | |
for p in self.ports: | |
if path == p or path.startswith(p + '.'): | |
match = True | |
break | |
if not match: | |
return | |
if action == 'add': | |
self.devadd(path) | |
elif action == 'remove': | |
self.devdel(path) | |
else: | |
print(f'unhandled action {action}') | |
except FileNotFoundError: | |
pass | |
except: | |
print_exc() | |
def devxml(self, path): | |
bus, port = path.split('-', 1) | |
return ('<hostdev mode="subsystem" type="usb" managed="yes">' | |
'<source>' | |
f'<vendor id="0x{self.usbprop(path, "idVendor")}"/>' | |
f'<product id="0x{self.usbprop(path, "idProduct")}"/>' | |
#f'<address type="usb" bus="0x{bus}" port="{port}"/>' | |
'</source>' | |
'</hostdev>') | |
def devadd(self, path): | |
print(f'attaching {path}') | |
self.guest.attachDevice(self.devxml(path)) | |
def devdel(self, path): | |
print(f'detaching {path}') | |
self.guest.detachDevice(self.devxml(path)) | |
if __name__ == '__main__': | |
PortForward().main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment