Skip to content

Instantly share code, notes, and snippets.

@yunfan
Created June 27, 2025 06:34
Show Gist options
  • Save yunfan/0533ee3a5d07a5df2f1eb93a73a2b6da to your computer and use it in GitHub Desktop.
Save yunfan/0533ee3a5d07a5df2f1eb93a73a2b6da to your computer and use it in GitHub Desktop.
a simple terminal multiplexer
import os
import subprocess
import select
import signal
import struct
import time
import fcntl
import termios
import sys
from ptyprocess import PtyProcessUnicode
"""
you need vanilla installed python and install a ptyprocess package
"""
class NonBlockingInput(object):
def __enter__(self):
# canonical mode, no echo
self.old = termios.tcgetattr(sys.stdin)
new = termios.tcgetattr(sys.stdin)
new[3] = new[3] & ~(termios.ICANON | termios.ECHO)
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new)
# set for non-blocking io
self.orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, self.orig_fl | os.O_NONBLOCK)
def __exit__(self, *args):
# restore terminal to previous state
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old)
# restore original
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, self.orig_fl)
def set_size(fd, cols, lines):
fcntl.ioctl(fd, termios.TIOCSWINSZ, struct.pack("hhhh", cols, lines, 0, 0))
size = os.get_terminal_size()
def create_pty_and_run_shell2(cmd):
env = dict(os.environ)
process = PtyProcessUnicode.spawn(
cmd,
env=env,
echo=False,
dimensions=(size.lines, size.columns),
)
# set for non-blocking io
theFd = process.fd
oldFlags = fcntl.fcntl(theFd, fcntl.F_GETFL)
fcntl.fcntl(theFd, fcntl.F_SETFL, oldFlags | os.O_NONBLOCK)
return theFd, process
G = {
"cmd": False,
"target": -1,
"fds": [],
"processes": {},
}
# Main loop for reading and writing to the PTY
poller = select.poll()
poller.register(0, select.POLLIN) # Standard input (user's keyboard)
# In a real multiplexer, you'd manage multiple such pairs
for cmd in [
["/usr/bin/htop"],
["/usr/bin/vim"],
["/usr/bin/watch", "-n1", "ps", "-ef"],
]:
master_fd, process = create_pty_and_run_shell2(cmd)
poller.register(master_fd, select.POLLIN)
G["fds"].append(master_fd)
G["processes"][master_fd] = process
G["target"] += 1
keepgo = True
getTargetFd = lambda: G["fds"][G["target"]]
G["target"] = 0
readBuf = 256
with NonBlockingInput():
while keepgo:
events = poller.poll()
for fd, event in events:
currFd = getTargetFd()
if fd != 0 and (event & select.POLLIN):
theP = G["processes"][fd]
output = theP.read(readBuf)
if fd == currFd:
# only if the current choosing is this one
# Read from the shell's output and print to the main terminal
while True:
try:
os.write(1, output.encode()) # Write to standard output
except:
time.sleep(0.000001)
else:
break
else:
# drop the output
pass
if fd == 0 and (event & select.POLLIN):
# Read user input and send it to the shell
# clearCmd = "\x1b[2J"
clearCmd = "\f"
key = os.read(0, 1)
if G["cmd"]:
if key == b"h":
G["target"] -= 1
if G["target"] == -1:
G["target"] = len(G["fds"]) - 1
theFd = getTargetFd()
theP = G["processes"][theFd]
theP.setwinsize(size.lines, size.columns)
theP.kill(signal.SIGWINCH)
theP.write(clearCmd)
G["cmd"] = False
elif key == b"l":
G["target"] += 1
if G["target"] >= len(G["fds"]):
G["target"] = 0
theFd = getTargetFd()
theP = G["processes"][theFd]
theP.setwinsize(size.lines, size.columns)
theP.kill(signal.SIGWINCH)
theP.write(clearCmd)
G["cmd"] = False
elif key == b"f":
theP = G["processes"][currFd]
theP.write(clearCmd)
while True:
try:
os.write(1, clearCmd.encode())
except:
time.sleep(0.000001)
else:
break
G["cmd"] = False
elif key == b"q":
keepgo = False
break
else:
G["cmd"] = False
else:
if key == b"\x01": # C-a
G["cmd"] = True
else:
# direct send to current process
currP = G["processes"][currFd]
currP._writeb(key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment