Last active
June 30, 2023 14:08
-
-
Save alaniwi/5203f96fc8ab86b327415df72d83bc7c to your computer and use it in GitHub Desktop.
Script to run commands in different "panes" of a terminal window
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# example usage: | |
# | |
# this runs the following commands simultaneously, each in different areas of the screen: | |
# 1) echo hello | |
# 2) cal | |
# 3) for i in 1 2 3 4 5 ; do date; sleep 1; done | |
# | |
# it waits for 2 seconds after they have all finished, before clearing the screen | |
# | |
./run_in_panes.py -s 2 'echo hello' 'cal' 'for i in 1 2 3 4 5 ; do date; sleep 1; done' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import curses | |
import os | |
import select | |
import signal | |
import subprocess | |
import time | |
class Panes: | |
""" | |
curses-based app that divides the screen into a number of scrollable | |
panes and lets the caller write text into them | |
""" | |
def start(self, num_panes): | |
"set up the panes and initialise the app" | |
# curses init | |
self.num = num_panes | |
self.stdscr = curses.initscr() | |
curses.noecho() | |
curses.cbreak() | |
# split the screen into number of panes stacked vertically, | |
# drawing some horizontal separator lines | |
scr_height, scr_width = self.stdscr.getmaxyx() | |
div_ys = [scr_height * i // self.num for i in range(1, self.num)] | |
for y in div_ys: | |
self.stdscr.addstr(y, 0, '-' * scr_width) | |
self.stdscr.refresh() | |
# 'boundaries' contains y coords of separator lines including notional | |
# separator lines above and below everything, and then the panes | |
# occupy the spaces between these | |
boundaries = [-1] + div_ys + [scr_height] | |
self.panes = [] | |
for i in range(self.num): | |
top = boundaries[i] + 1 | |
bottom = boundaries[i + 1] - 1 | |
height = bottom - top + 1 | |
width = scr_width | |
# create a scrollable pad for this pane, of height at least | |
# 'height' (could be more to retain some scrollback history) | |
pad = curses.newpad(height, width) | |
pad.scrollok(True) | |
self.panes.append({'pad': pad, | |
'coords': [top, 0, bottom, width], | |
'height': height}) | |
def write(self, pane_num, text): | |
"write text to the specified pane number (from 0 to num_panes-1)" | |
pane = self.panes[pane_num] | |
pad = pane['pad'] | |
y, x = pad.getyx() | |
pad.addstr(y, x, text) | |
y, x = pad.getyx() | |
view_top = max(y - pane['height'], 0) | |
pad.refresh(view_top, 0, *pane['coords']) | |
def end(self): | |
"restore the original terminal behaviour" | |
curses.nocbreak() | |
self.stdscr.keypad(0) | |
curses.echo() | |
curses.endwin() | |
def watch_fds_in_panes(fds_by_pane, sleep_at_end=0): | |
""" | |
Use panes to watch output from a number of fds that are writing data. | |
fds_by_pane contains a list of lists of fds to watch in each pane. | |
""" | |
panes = Panes() | |
npane = len(fds_by_pane) | |
panes.start(npane) | |
pane_num_for_fd = {} | |
active_fds = [] | |
data_tmpl = {} | |
for pane_num, pane_fds in enumerate(fds_by_pane): | |
for fd in pane_fds: | |
active_fds.append(fd) | |
pane_num_for_fd[fd] = pane_num | |
data_tmpl[fd] = bytes() | |
try: | |
while active_fds: | |
all_data = data_tmpl.copy() | |
timeout = None | |
while True: | |
fds_read, _, _ = select.select(active_fds, [], [], timeout) | |
timeout = 0 | |
if fds_read: | |
for fd in fds_read: | |
data = os.read(fd, 1) | |
if data: | |
all_data[fd] += data | |
else: | |
active_fds.remove(fd) # saw EOF | |
else: | |
# no more data ready to read | |
break | |
for fd, data in all_data.items(): | |
if data: | |
strng = data.decode('utf-8') | |
panes.write(pane_num_for_fd[fd], strng) | |
time.sleep(sleep_at_end) | |
except KeyboardInterrupt: | |
panes.end() | |
raise | |
panes.end() | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-s", "--sleep-at-end", type=float, metavar="seconds", | |
help="time to sleep for at end before clearing screen", | |
default=0.) | |
parser.add_argument("commands", nargs="+", metavar="command", | |
help=("command to run in each pane " | |
"(if the command takes arguments, then quotation marks " | |
"will be needed around a command and its " | |
"arguments if invoking this from a shell)") | |
) | |
return parser.parse_args() | |
def main(): | |
opts = parse_args() | |
num_panes = len(opts.commands) | |
procs = [subprocess.Popen(command, | |
shell=True, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
for command in opts.commands] | |
try: | |
watch_fds_in_panes([[proc.stdout.fileno(), proc.stderr.fileno()] | |
for proc in procs], | |
sleep_at_end=opts.sleep_at_end) | |
except KeyboardInterrupt: | |
print("interrupted") | |
for proc in procs: | |
proc.send_signal(signal.SIGINT) | |
time.sleep(1) | |
for proc in procs: | |
proc.send_signal(signal.SIGKILL) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment