Created
March 20, 2024 12:30
-
-
Save jquast/ebd0b93b0db5557a618a06089f3ebf26 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import blessed | |
import os, glob | |
import math | |
# TODO: | |
# - fast in-memory string compression of output | |
# - run at ~some fps, index into frames by rtt /2 use terminal query | |
import asyncio, telnetlib3 | |
class MyTelnetServer(telnetlib3.TelnetServer): | |
def on_naws(self, rows, cols): | |
""" | |
Callback receives NAWS response, :rfc:`1073`. | |
:param int rows: screen size, by number of cells in height. | |
:param int cols: screen size, by number of cells in width. | |
""" | |
self.writer.width, self.writer.height = cols, rows | |
FONT_RATIO = 119/425 | |
term = blessed.Terminal(kind='xterm-256color', force_styling=True) | |
import time | |
async def shell(reader, writer): | |
stime = time.time() | |
if not hasattr(writer, 'width'): | |
writer.width, writer.height = 0, 0 | |
match_height, match_width, last_width, last_height = -1, -1, 0, 0 | |
frame_no = 0 | |
files = [] | |
writer.write(term.home + term.clear_eos + term.hide_cursor) | |
while True: | |
t = time.time() | |
if len(files) == 0 or last_width != writer.width or last_height != writer.height: | |
writer.write(term.home + term.clear) | |
last_width, last_height = writer.width, writer.height | |
match_width = 10 | |
for width in range(10, 400, 10): | |
if width > writer.width: | |
break | |
if width > 150: | |
break | |
height = int(math.ceil(width * FONT_RATIO)) | |
if height > writer.height: | |
break | |
if not os.path.exists(f'max/{width}/frame-0000.ans'): | |
continue | |
match_width = width | |
match_height = int(math.ceil(match_width * FONT_RATIO)) | |
dirty = True | |
# a width of 290 is 230MB !? | |
files = sorted(glob.glob(f'max/{match_width}/*.ans')) | |
print(f'load {match_width}... for {reader}, {writer}, {writer._transport}, {writer.protocol}') | |
redo=False | |
output = [] | |
# for f in files: | |
# if last_width != writer.width or last_height != writer.height: | |
# redo=True | |
# break | |
# output.append(open(f, 'r').read()) | |
# if redo: | |
# continue | |
print(f'loaded {match_width} width in {time.time()-t:0.2f}s') | |
margin_top = int(max(0, (writer.height - match_height) / 2)) | |
margin_left = int(max(0, (writer.width - match_width) / 2)) | |
# continue | |
if frame_no == len(files) - 2: | |
frame_no = int(len(files)*.74) | |
output = open(files[frame_no], 'r').read() | |
# if dirty: | |
# writer.write(term.home + term.clear) | |
# dirty = False | |
writer.write(term.move_y(margin_top)) | |
for y, line in enumerate(output.splitlines()): | |
writer.write(term.move_yx(margin_top + y, margin_left) + line) | |
frame_no += 1 | |
await writer.drain() | |
await asyncio.sleep(max(0, 0.1-(time.time()-t))) | |
if time.time() - stime > 300: | |
writer.write(term.reset + "\r\n\r\nThat's it! Have a day.\r\n") | |
break | |
loop = asyncio.get_event_loop() | |
#TelnetWritertelnetlib3/client.py | |
# use rows=cols=0 until negotiated, hard way or easy way | |
coro = telnetlib3.create_server(port=23, cols=0, rows=0, shell=shell, force_binary=True, | |
encoding='utf8', protocol_factory=MyTelnetServer) | |
server = loop.run_until_complete(coro) | |
loop.run_until_complete(server.wait_closed()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment