Last active
May 2, 2022 06:26
-
-
Save RavuAlHemio/ca0f7afc753f63895b69cc22dc9881b8 to your computer and use it in GitHub Desktop.
automates inserting and removing USB drives backed by a VHDX image from a QEMU VM
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
& "C:\Program Files\qemu\qemu-system-i386.exe" ` | |
-cpu pentium3 ` | |
-m 1024 ` | |
-vga cirrus ` | |
-drive "if=ide,bus=0,unit=0,media=disk,file=w2k_de.qcow2" ` | |
-drive "if=ide,bus=1,unit=0,media=cdrom" ` | |
-device sb16 ` | |
-device "usb-ehci" ` | |
-device "usb-tablet" ` | |
-qmp "tcp:127.0.0.1:6969,server,wait=off" ` | |
-nic none ` | |
-rtc "base=localtime" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# automates inserting and removing USB drives backed by a VHDX image from a QEMU VM | |
# | |
import argparse | |
import json | |
import socket | |
from typing import Any | |
def recv_json(sock: socket.socket) -> Any: | |
bs_list: list[bytes] = [] | |
op_stack: list[str] = [] | |
escape_level = 0 | |
anything_read = False | |
while True: | |
b = sock.recv(1) | |
bs_list.append(b) | |
if not anything_read and b.isspace(): | |
# skip spaces | |
continue | |
anything_read = True | |
if escape_level > 0: | |
if escape_level == 1 and b == "u": | |
# Unicode escape; four more characters follow | |
escape_level = 4 | |
else: | |
escape_level -= 1 | |
continue | |
if op_stack and op_stack[-1] == '"': | |
# reading a string | |
if b == b'\\': | |
# escape starts | |
escape_level = 1 | |
elif b == b'"': | |
assert escape_level == 0 | |
# end of string | |
op_stack.pop() | |
# anything else is part of the string | |
continue | |
# not reading a string | |
if b == b'[': | |
# list start | |
op_stack.append('[') | |
elif b == b'{': | |
# object start | |
op_stack.append('{') | |
elif b == b']': | |
# list end | |
op = op_stack.pop() | |
if op != '[': | |
raise ValueError(f"closing '[' with {op!r}") | |
elif b == b'}': | |
# object end | |
op = op_stack.pop() | |
if op != '{': | |
raise ValueError(f"closing '{{' with {op!r}") | |
if not op_stack: | |
# nothing left on the stack | |
# this is the end of the JSON document | |
break | |
# collect bytes, decode as UTF-8, load as JSON | |
json_bs = b"".join(bs_list) | |
json_str = json_bs.decode("utf-8") | |
return json.loads(json_str) | |
def send_json(sock: socket.socket, val: Any): | |
sock.send(json.dumps(val).encode("utf-8")) | |
def query_cmds(sock: socket.socket) -> Any: | |
sock.send(b'{"execute":"query-commands"}') | |
return recv_json(sock) | |
def unplug_usb(sock: socket.socket, name: str): | |
# remove USB storage device | |
usb_stor_add = { | |
"execute": "device_del", | |
"arguments": { | |
"id": name, | |
}, | |
} | |
usb_unplugged = False | |
send_json(sock, usb_stor_add) | |
result = recv_json(sock) | |
print(result) | |
if "error" in result: | |
if result["error"]["class"] == "DeviceNotFound": | |
# oh well | |
usb_unplugged = True | |
# await device deletion | |
while not usb_unplugged: | |
event = recv_json(sock) | |
print(event) | |
if event.get("event", "") == "DEVICE_DELETED": | |
if event["data"].get("device", "") == name: | |
# perfect | |
usb_unplugged = True | |
def unplug_block(sock: socket.socket, name: str): | |
# remove block device | |
block_add = { | |
"execute": "blockdev-del", | |
"arguments": { | |
"node-name": name, | |
}, | |
} | |
send_json(sock, block_add) | |
result = recv_json(sock) | |
print(result) | |
# block devices are unplugged immediately | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument(dest='mode', choices=("plug", "unplug", "cmds")) | |
args = parser.parse_args() | |
sock = socket.create_connection(("127.0.0.1", 6969)) | |
# obtain greeting | |
recv_json(sock) | |
# negotiate capabilities | |
send_json(sock, {"execute": "qmp_capabilities"}) | |
recv_json(sock) | |
if args.mode == "cmds": | |
# get commands | |
send_json(sock, {"execute": "query-commands"}) | |
cmds_resp = recv_json(sock) | |
for cmd in sorted(c["name"] for c in cmds_resp["return"]): | |
print(cmd) | |
elif args.mode == "plug": | |
# add block device | |
block_add = { | |
"execute": "blockdev-add", | |
"arguments": { | |
"driver": "vhdx", | |
"node-name": "stic2k-blk", | |
"file": { | |
"driver": "file", | |
"filename": "stic2k.vhdx", | |
}, | |
}, | |
} | |
send_json(sock, block_add) | |
print(recv_json(sock)) | |
# add USB storage device | |
usb_stor_add = { | |
"execute": "device_add", | |
"arguments": { | |
"driver": "usb-storage", | |
"drive": "stic2k-blk", | |
"id": "stic2k-usb", | |
}, | |
} | |
send_json(sock, usb_stor_add) | |
print(recv_json(sock)) | |
else: | |
assert args.mode == "unplug" | |
unplug_usb(sock, "stic2k-usb") | |
unplug_block(sock, "stic2k-blk") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment