Created
September 23, 2023 23:26
-
-
Save comex/fd9748146ad838855d5a9d4b664aa603 to your computer and use it in GitHub Desktop.
Tool to test a shell's input latency
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys, pty, os, time, signal, threading, string, random | |
sub_argv = sys.argv[1:] | |
if not sub_argv: | |
raise Exception("usage: shell-latency-test.py shell-to-test (e.g.: shell-latency-test.py nu)") | |
# Spawn the shell in a pty which we own the controlling end of. | |
child_pid, fd = pty.fork() | |
if child_pid == 0: | |
# This is the child process. Exec the shell. | |
os.execvp(sub_argv[0], sub_argv) | |
raise Exception("execvp failed") | |
# Otherwise, we're the parent. | |
# Come up with a random 'command' to type into the shell (just some random letters). | |
# It's random to ensure that we don't see a false positive due to e.g. nushell's | |
# history completion. | |
random_command = ''.join(random.sample(string.ascii_letters, 10)).encode('ascii') | |
def input_thread(): | |
global wrote_random_command_time | |
# Wait one second to ensure the shell is done initializing. | |
time.sleep(1) | |
# Record the current time... | |
wrote_random_command_time = time.time() | |
# And send the Record the current time... | |
os.write(fd, random_command) | |
def output_thread(): | |
# Log all the data we've ever read, so that we can search for strings | |
# without it breaking if the string is split across multiple read calls. | |
# This is O(n^2) but n is too small for it to matter. | |
ever_read_data = b'' | |
# How many current position requests were seen last time. | |
old_6n_count = 0 | |
# Use raw reads to avoid any buffering. | |
while new_data := os.read(fd, 1048576): | |
# Record the current time and the data read | |
read_time = time.time() | |
ever_read_data += new_data | |
print('read:', repr(new_data)) | |
# Did we see our command echoed back by the shell? | |
if random_command in ever_read_data: | |
# We shouldn't have read it back if we didn't write it yet: | |
assert wrote_random_command_time is not None | |
# All done. | |
print('done. latency:', read_time - wrote_random_command_time) | |
break | |
# Otherwise, is there a current position request? | |
# We don't do a full terminal simulation, but we do need to respond to | |
# these, or else nushell won't start up. | |
new_6n_count = ever_read_data.count(b'\x1b[6n') | |
for _ in range(new_6n_count - old_6n_count): | |
os.write(fd, b'\x1b[1;1R') | |
old_6n_count = new_6n_count | |
threading.Thread(target=input_thread).start() | |
output_thread() | |
# All done. Kill the shell. | |
os.kill(child_pid, signal.SIGTERM) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment