Skip to content

Instantly share code, notes, and snippets.

@ben-cohen
Last active November 4, 2024 20:15
Show Gist options
  • Save ben-cohen/db7332cf335baa93e0e129e44fe44ca7 to your computer and use it in GitHub Desktop.
Save ben-cohen/db7332cf335baa93e0e129e44fe44ca7 to your computer and use it in GitHub Desktop.
Forward a terminal with window size changes across a unix domain socket
#!/usr/bin/python3
#
# pipewinch: Forward a terminal with window size changes across a unix
# domain socket.
#
# Usage:
# pipewinch <uds-action> <uds-path> exec ARGS...
# pipewinch <uds-action> <uds-path> term
# where uds-action is "listen" or "connect"
#
# A pair of `pipewinch` instances, one invoked with `exec` and the other
# with `term`, forwards the input and output of a program across a unix
# domain socket to a terminal, including window size changes. This can be
# used to interact with programs running in containers or as other users.
#
# The `exec` instance runs the command line `ARGS...` in a pseudo-terminal
# (pty). It sends the program's output to the socket and forwards input
# from the socket to the program. It also resizes the pty when it
# receives a window size changed message from the socket.
#
# The `term` instance acts as a terminal across the socket, displaying the
# output and forwarding the input. If the terminal's window size changes,
# it sends a window size changed message.
#
# If the uds-action `listen` is used then `pipewinch` will act as a
# server, creating and listening on a unix domain socket at
# the location <uds-path>; or if `connect` is used then `pipewinch` will
# act as a client, connecting to the socket at <uds-path>.
#
# It does not matter whether the `exec` or the `term` instance does the
# `listen` or `connect`, but the instance doing `listen` must start first
# and then the other must do `connect`.
#
# Example:
#
# pipewinch listen /tmp/uds exec /bin/bash -l
#
# pipewinch connect /tmp/uds term
#
# Background: `pipewinch` can be used as an alternative to `socat`, for
# example the commands below which forward the terminal but not the window
# size changed information. This means that programs such as `vi` and
# `less` (or even command-line editing in the shell) do not display
# correctly after you resize the window.
#
# rm -f /tmp/uds ; \
# socat EXEC:"/bin/bash -l",pty,setsid,setpgid,stderr,ctty,echo=0,sane \
# UNIX-LISTEN:/tmp/uds
#
# socat UNIX-CONNECT:/tmp/uds STDIO,raw,echo=0
#
# Instead, `pipewinch` in the second session sends the window size changed
# information encoded as an in-band message, and `pipewinch` in the first
# session decodes it and sets the terminal size.
#
# (Similarly, `telnet` sends window size changed information as an in-band
# message. `ssh` is able to send an out-of-band message because its protocol
# has multiple channels. A script similar to `pipewinch` called `pty` from
# https://github.com/wheelcomplex/misc uses a separate connection to forward
# the information. It would be useful if `socat` could also do forward window
# size changed information - presumably in-band - but it currently does not.)
#
# Security: Any user who can access `uds_path` will be able to attach to
# the program. The socket is created using `umask()` such that only the
# current user can access it. (It would be possible to allow other users
# to access the socket if it, or its containing directory, is created with
# the appropriate group and permissions; but this is not implemented here.)
#
#
# For more information, see:
# https://unix.stackexchange.com/questions/207782/
# https://stackoverflow.com/questions/63230001/
# http://www.dest-unreach.org/socat/
# https://stackoverflow.com/questions/28776502/unix-domain-socket-securing-receiver
# https://unix.stackexchange.com/questions/117981/what-are-the-responsibilities-of-each-pseudo-terminal-pty-component-software
# https://utcc.utoronto.ca/~cks/space/blog/unix/WindowSizeIoctlAndSignal
#
# Ben Cohen, August 2022.
#
import argparse
import fcntl
import os
import pty
import select
import signal
import socket
import struct
import subprocess
import sys
import termios
import threading
# The window size changed message has the format
# "\xFFWS=" + [8 byte TIOCGWINSZ result converted to network byte order]
# + "\xFF"
# If the stream contains the bytes "\xFFWS" then they must be escaped using
# "\xFFWS\xFF"
# It is an error for the stream to contain the bytes "\xFFWS" and the following
# character to be anything other than '=' or '\xFF'.
MAGIC_0 = b"\xFF"
MAGIC_1 = b"W"
MAGIC_2 = b"S"
MAGIC_3 = b"="
def winch_handler(signum, stack_frame):
global window_size_changed
window_size_changed = True
def get_window_size_string():
arg = struct.pack('HHHH', 0, 0, 0, 0)
winszle = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, arg)
# Convert to network byte order
return struct.pack('!HHHH', *struct.unpack('HHHH', winszle))
def set_window_size(msg):
assert(len(msg) == 8)
# Convert from network byte order
winsz = struct.pack('!HHHH', *struct.unpack('HHHH', msg))
fcntl.ioctl(mfd, termios.TIOCSWINSZ, winsz)
def check_winch(input_buffer, copy_behaviour):
# COPY_TO_FD_SEND_WINCH: escape "\xFFWS".
# COPY_FROM_FD_CHECK_WINCH: handle escape for "\xFFWS" and set window size.
global check_winch_state, check_winch_buffer
assert(type(input_buffer) == bytes)
result = b""
def reset_state():
global check_winch_state, check_winch_buffer
check_winch_state = 0
check_winch_buffer = b""
def advance_state():
global check_winch_state, check_winch_buffer
check_winch_state += 1
check_winch_buffer += nextchar
for nextchar in input_buffer:
nextchar = bytes([nextchar])
if check_winch_state == 0: # '\xFF'
if nextchar == MAGIC_0:
advance_state()
else:
result += nextchar
elif check_winch_state == 1: # 'W'
if nextchar == MAGIC_1:
advance_state()
else:
result += check_winch_buffer + nextchar
reset_state()
elif check_winch_state == 2: # 'S'
if nextchar == MAGIC_2:
if copy_behaviour == COPY_TO_FD_SEND_WINCH:
result += check_winch_buffer + nextchar + MAGIC_0
reset_state()
else:
advance_state()
else:
result += check_winch_buffer + nextchar
reset_state()
elif check_winch_state == 3: # '=' or '\xFF'
assert(copy_behaviour != COPY_TO_FD_SEND_WINCH)
if nextchar == MAGIC_0:
result += check_winch_buffer
reset_state()
elif nextchar == MAGIC_3:
advance_state()
else:
assert(False)
elif 4 <= check_winch_state <= 11: # TIOCGWINSZ result in network byte order
assert(copy_behaviour != COPY_TO_FD_SEND_WINCH)
advance_state()
elif check_winch_state == 12: # '\xFF'
assert(copy_behaviour != COPY_TO_FD_SEND_WINCH)
assert(nextchar == MAGIC_0)
set_window_size(check_winch_buffer[4:12])
reset_state()
else:
assert(False)
assert(type(result) == bytes)
return result
COPY_NOOP = 0
COPY_FROM_FD_CHECK_WINCH = 1
COPY_TO_FD_SEND_WINCH = 2
def do_copy(from_fd, to_fd, copy_behaviour = COPY_NOOP):
global running, window_size_changed
while True:
# TODO: Improve error handling!
try:
rfds, wfds, xfds = select.select([from_fd], [], [], 0.001)
if len(rfds) == 0 and not running:
# No more input and we got EOF on another stream so give up
break
if from_fd in rfds:
data = os.read(from_fd, 1024)
if len(data) > 0:
if copy_behaviour != COPY_NOOP:
data = check_winch(data, copy_behaviour)
os.write(to_fd, data)
else:
# EOF
running = False
break
if copy_behaviour == COPY_TO_FD_SEND_WINCH and window_size_changed:
os.write(to_fd, MAGIC_0 + MAGIC_1 + MAGIC_2 + MAGIC_3 + get_window_size_string() + MAGIC_0)
window_size_changed = False
except:
running = False
break
def usage(message = None):
if message is not None:
print(f"{sys.argv[0]}: error: " + message)
print()
print("Usage:")
print(f" {sys.argv[0]} <uds-action> <uds-path> exec [ARGS...]")
print(f" {sys.argv[0]} <uds-action> <uds-path> term ")
print('where uds-action is "listen" or "connect"')
sys.exit(1)
if len(sys.argv) < 4:
usage("too few arguments")
uds_action = sys.argv[1]
uds_path = sys.argv[2]
main_action = sys.argv[3]
subprocessargs = sys.argv[4:]
check_winch_state = 0
check_winch_buffer = b""
running = True
window_size_changed = True
uds_conn = None
signal.signal(signal.SIGWINCH, winch_handler)
if main_action not in ['exec', 'term']:
usage(f"unknown mode '{main_action}'")
if uds_action == 'listen':
# UDS server
if os.path.exists(uds_path):
os.remove(uds_path)
old_umask = os.umask(0o077)
uds_server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
uds_server.bind(uds_path)
os.umask(old_umask)
uds_server.listen()
(uds_conn, addr) = uds_server.accept()
os.remove(uds_path)
uds_server.close()
elif uds_action == 'connect':
# UDS client
if not os.path.exists(uds_path):
print(f"Path {uds_path} does not exist")
sys.exit(1)
uds_conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
uds_conn.connect(uds_path)
else:
usage(f"unknown socket action '{uds_action}'")
if main_action == 'exec':
if len(subprocessargs) == 0:
subprocessargs = ["/bin/sh"]
# Set up pty and terminal. (See "man 7 pty".)
mfd, sfd = pty.openpty()
# Start subprocess...
child_pid = os.fork()
if child_pid == 0:
# ... Child continues here
for fd in [0, 1, 2]:
if fd != sfd:
os.close(fd)
os.dup2(sfd, fd)
os.close(mfd)
os.close(sfd)
os.setsid()
fcntl.ioctl(sys.stdin, termios.TIOCSCTTY, 0)
# Set up terminal
os.system("stty sane")
os.execvpe(subprocessargs[0], subprocessargs, os.environ)
# ... Parent continues here
assert(child_pid)
os.close(sfd)
# Start threads to copy data
stdinthr = threading.Thread(target=do_copy, args=(uds_conn.fileno(), mfd, COPY_FROM_FD_CHECK_WINCH))
stdoutthr = threading.Thread(target=do_copy, args=(mfd, uds_conn.fileno()))
stdinthr.start()
stdoutthr.start()
# Wait for everything to finish
rc = os.waitpid(child_pid, 0) # Could send `rc` to the other end at this point
uds_conn.shutdown(socket.SHUT_RDWR)
uds_conn.close()
os.close(mfd)
stdinthr.join()
stdoutthr.join()
elif main_action == 'term':
# Set up terminal
if (os.isatty(sys.stdin.fileno())):
old_attrs = termios.tcgetattr(sys.stdin)
os.system("stty raw -echo")
# Start threads to copy data
stdinthr = threading.Thread(target=do_copy, args=(sys.stdin.fileno(), uds_conn.fileno(), COPY_TO_FD_SEND_WINCH))
stdoutthr = threading.Thread(target=do_copy, args=(uds_conn.fileno(), sys.stdout.fileno()))
stdinthr.start()
stdoutthr.start()
# Wait for everything to finish
stdinthr.join()
stdoutthr.join()
uds_conn.shutdown(socket.SHUT_RDWR)
uds_conn.close()
# Reset terminal
if (os.isatty(sys.stdin.fileno())):
termios.tcsetattr(sys.stdin, termios.TCSANOW, old_attrs)
else:
assert(False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment