Created
June 27, 2025 06:34
-
-
Save yunfan/0533ee3a5d07a5df2f1eb93a73a2b6da to your computer and use it in GitHub Desktop.
a simple terminal multiplexer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
import select | |
import signal | |
import struct | |
import time | |
import fcntl | |
import termios | |
import sys | |
from ptyprocess import PtyProcessUnicode | |
""" | |
you need vanilla installed python and install a ptyprocess package | |
""" | |
class NonBlockingInput(object): | |
def __enter__(self): | |
# canonical mode, no echo | |
self.old = termios.tcgetattr(sys.stdin) | |
new = termios.tcgetattr(sys.stdin) | |
new[3] = new[3] & ~(termios.ICANON | termios.ECHO) | |
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new) | |
# set for non-blocking io | |
self.orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL) | |
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, self.orig_fl | os.O_NONBLOCK) | |
def __exit__(self, *args): | |
# restore terminal to previous state | |
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old) | |
# restore original | |
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, self.orig_fl) | |
def set_size(fd, cols, lines): | |
fcntl.ioctl(fd, termios.TIOCSWINSZ, struct.pack("hhhh", cols, lines, 0, 0)) | |
size = os.get_terminal_size() | |
def create_pty_and_run_shell2(cmd): | |
env = dict(os.environ) | |
process = PtyProcessUnicode.spawn( | |
cmd, | |
env=env, | |
echo=False, | |
dimensions=(size.lines, size.columns), | |
) | |
# set for non-blocking io | |
theFd = process.fd | |
oldFlags = fcntl.fcntl(theFd, fcntl.F_GETFL) | |
fcntl.fcntl(theFd, fcntl.F_SETFL, oldFlags | os.O_NONBLOCK) | |
return theFd, process | |
G = { | |
"cmd": False, | |
"target": -1, | |
"fds": [], | |
"processes": {}, | |
} | |
# Main loop for reading and writing to the PTY | |
poller = select.poll() | |
poller.register(0, select.POLLIN) # Standard input (user's keyboard) | |
# In a real multiplexer, you'd manage multiple such pairs | |
for cmd in [ | |
["/usr/bin/htop"], | |
["/usr/bin/vim"], | |
["/usr/bin/watch", "-n1", "ps", "-ef"], | |
]: | |
master_fd, process = create_pty_and_run_shell2(cmd) | |
poller.register(master_fd, select.POLLIN) | |
G["fds"].append(master_fd) | |
G["processes"][master_fd] = process | |
G["target"] += 1 | |
keepgo = True | |
getTargetFd = lambda: G["fds"][G["target"]] | |
G["target"] = 0 | |
readBuf = 256 | |
with NonBlockingInput(): | |
while keepgo: | |
events = poller.poll() | |
for fd, event in events: | |
currFd = getTargetFd() | |
if fd != 0 and (event & select.POLLIN): | |
theP = G["processes"][fd] | |
output = theP.read(readBuf) | |
if fd == currFd: | |
# only if the current choosing is this one | |
# Read from the shell's output and print to the main terminal | |
while True: | |
try: | |
os.write(1, output.encode()) # Write to standard output | |
except: | |
time.sleep(0.000001) | |
else: | |
break | |
else: | |
# drop the output | |
pass | |
if fd == 0 and (event & select.POLLIN): | |
# Read user input and send it to the shell | |
# clearCmd = "\x1b[2J" | |
clearCmd = "\f" | |
key = os.read(0, 1) | |
if G["cmd"]: | |
if key == b"h": | |
G["target"] -= 1 | |
if G["target"] == -1: | |
G["target"] = len(G["fds"]) - 1 | |
theFd = getTargetFd() | |
theP = G["processes"][theFd] | |
theP.setwinsize(size.lines, size.columns) | |
theP.kill(signal.SIGWINCH) | |
theP.write(clearCmd) | |
G["cmd"] = False | |
elif key == b"l": | |
G["target"] += 1 | |
if G["target"] >= len(G["fds"]): | |
G["target"] = 0 | |
theFd = getTargetFd() | |
theP = G["processes"][theFd] | |
theP.setwinsize(size.lines, size.columns) | |
theP.kill(signal.SIGWINCH) | |
theP.write(clearCmd) | |
G["cmd"] = False | |
elif key == b"f": | |
theP = G["processes"][currFd] | |
theP.write(clearCmd) | |
while True: | |
try: | |
os.write(1, clearCmd.encode()) | |
except: | |
time.sleep(0.000001) | |
else: | |
break | |
G["cmd"] = False | |
elif key == b"q": | |
keepgo = False | |
break | |
else: | |
G["cmd"] = False | |
else: | |
if key == b"\x01": # C-a | |
G["cmd"] = True | |
else: | |
# direct send to current process | |
currP = G["processes"][currFd] | |
currP._writeb(key) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment