Skip to content

Instantly share code, notes, and snippets.

@prozacchiwawa
Created May 27, 2024 02:35
Show Gist options
  • Save prozacchiwawa/7b56fb606dc95e08b675a31d01057981 to your computer and use it in GitHub Desktop.
Save prozacchiwawa/7b56fb606dc95e08b675a31d01057981 to your computer and use it in GitHub Desktop.
gxemul XTERM= wrapper for connecting to a serial server from qemu (or others)
#!/usr/bin/env python3
# adapted from my xtwrap
import os
import sys
import tty
import time
import uuid
import signal
import socket
import argparse
import threading
import traceback
import subprocess
def copy_fd_to_stream(fdin,stream):
while True:
buf = os.read(fdin, 128)
if len(buf) == 0:
return
print('to windbg',len(buf))
while len(buf) > 0:
try:
count = stream.send(buf)
buf = buf[count:]
except:
traceback.print_exc()
return
def copy_stream_to_fd(stream,fdout):
while True:
try:
buf = stream.recv(128)
except:
traceback.print_exc()
return
if len(buf) == 0:
return
print('from windbg',len(buf))
while len(buf) > 0:
count = os.write(fdout, buf)
buf = buf[count:]
def handle_io_in_terminal(args):
sockname = args.inner
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(sockname)
try:
tty.setraw(sys.stdin.fileno())
except:
print('failed to set tty opts')
traceback.print_exc()
time.sleep(10)
def send_ctrl_c(signum, frame):
s.send(bytes([3]))
try:
signal.signal(signal.SIGINT, send_ctrl_c)
except:
print('failed to do signal handling')
traceback.print_exc()
time.sleep(10)
def write_output_from_socket():
copy_stream_to_fd(s, sys.stdout.fileno())
s.shutdown(2)
s.close()
os.close(sys.stdin.fileno())
os.unlink(args.inner)
to = threading.Thread(target=write_output_from_socket)
to.start()
def read_input_to_socket():
copy_fd_to_stream(sys.stdin.fileno(), s)
ti = threading.Thread(target=read_input_to_socket, daemon=True)
ti.start()
to.join()
def run_terminal_process(args, rest):
# Get passed-in info intended for child process from gxemul
ws_info = rest[0].split('S')[1].split(',')
fdin = int(ws_info[0])
fdout = int(ws_info[1])
# Make unix domain socket for communication with child process.
for_qemu_connect = socket.socket()
print('connecting to serial port 1235')
sys.stdout.flush()
for_qemu_name = ('127.0.0.1',1235)
for_qemu_connect.connect(for_qemu_name)
conn = for_qemu_connect
def copy_in_fd():
copy_fd_to_stream(fdin, conn)
conn.shutdown(2)
conn.close()
os.close(fdin)
def copy_out_fd():
copy_stream_to_fd(conn, fdout)
os.close(fdout)
tin = threading.Thread(target=copy_in_fd)
tin.start()
tout = threading.Thread(target=copy_out_fd, daemon=True)
tout.start()
tin.join()
def main():
p = argparse.ArgumentParser(description='allow gxemul serial to serve qemu connections')
p.add_argument('-title', type=str)
p.add_argument('-geometry', type=str)
p.add_argument('-e', action='store_true')
p.add_argument('-inner', type=str, default=False)
p.add_argument('exec', nargs='+')
args, rest = p.parse_known_args()
if args.inner is not False:
handle_io_in_terminal(args)
else:
run_terminal_process(args, rest)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment