Last active
October 8, 2023 00:37
-
-
Save comex/380653fefb3312b24da5d66ed5ee5be0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys, pty, os, time, threading, queue, argparse | |
ap = argparse.ArgumentParser() | |
parser = argparse.ArgumentParser( | |
description='Tests the input latency of a shell', | |
usage='usage: shell-latency-test-infinite.py [--sleep] [--] argv-to-execute...', | |
) | |
parser.add_argument('--sleep', | |
action='store_true', | |
help='''only type once every 1s instead of constantly - results in higher | |
observed latency, probably due to OS overhead''') | |
parser.add_argument('argv_to_execute', nargs='+', help='shell command to execute') | |
args = parser.parse_args() | |
# Spawn the shell in a pty which we own the controlling end of. | |
child_pid, fd = pty.fork() | |
if child_pid == 0: | |
# This is the child process. Exec the shell. | |
os.execvp(args.argv_to_execute[0], args.argv_to_execute) | |
raise Exception("execvp failed") | |
# Otherwise, we're the parent. | |
data_queue_main = queue.SimpleQueue() | |
data_queue_position_requests = queue.SimpleQueue() | |
data_queues = [data_queue_main, data_queue_position_requests] | |
def read_thread(): | |
'''Constantly read input from the shell and send it to each of the queues.''' | |
while new_data := os.read(fd, 1048576): | |
for q in data_queues: | |
q.put(new_data) | |
threading.Thread(target=read_thread, daemon=True).start() | |
def position_request_thread(): | |
'''Handle position requests.''' | |
buf = b'' | |
while True: | |
new_data = data_queue_position_requests.get() | |
buf = buf[-3:] + new_data | |
for _ in range(buf.count(b'\x1b[6n')): | |
os.write(fd, b'\x1b[1;1R') | |
threading.Thread(target=position_request_thread, daemon=True).start() | |
def join_queue(q): | |
'''Remove all bytes objects currently in the queue and return them concatenated.''' | |
ret = b'' | |
while True: | |
try: | |
ret += q.get_nowait() | |
except queue.Empty: | |
break | |
return ret | |
# Wait one second to ensure the shell is done initializing. | |
time.sleep(1) | |
startup_data = join_queue(data_queue_main) | |
print('startup:', startup_data) | |
durations = [] | |
durations_clear_time = time.time() | |
i = 0 | |
while True: | |
# Did we get any extra data, suggesting the below assumption is violated? | |
junk = join_queue(data_queue_main) | |
if junk: | |
print('junk:', junk) | |
# Alternately input 'a' and input a backspace to erase the 'a'. | |
os.write(fd, b'\b' if i & 1 else b'a') | |
# The timing isn't exact (e.g. there might be a delay between the write | |
# and calculating the time here), but any error is tens of microseconds | |
# at most. | |
time_pre = time.time() | |
# Assume the first read contains the full response to the input and the | |
# shell won't be sending anything more. If we didn't assume this, we | |
# couldn't immediately continue with more keystrokes. | |
new_data = data_queue_main.get() | |
duration_us = (time.time() - time_pre) * 1_000_000 | |
durations.append(duration_us) | |
if args.sleep: | |
time.sleep(1) | |
total_duration = time.time() - durations_clear_time | |
if total_duration >= 1.0 or args.sleep: | |
average_duration_us = sum(durations) / len(durations) | |
print(f'{len(durations)} rounds in {total_duration:2.1f}s, average latency {average_duration_us:5,.0f}us') | |
durations = [] | |
durations_clear_time = time.time() | |
i += 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment