Last active
February 9, 2024 19:10
-
-
Save tompng/5a3653b01424f36afcf6417c71ba3f40 to your computer and use it in GitHub Desktop.
Minimal terminal emulator that only supports Reline's escape sequences
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'pty' | |
require 'io/console' | |
if ARGV.empty? | |
puts <<~EOS | |
Reline Visualizer (Minimal terminal emulator that only supports Reline's escape sequences) | |
Usage: ruby #{__FILE__} <command>' | |
ruby #{__FILE__} irb | |
ruby #{__FILE__} bash | |
ruby #{__FILE__} zsh | |
ruby #{__FILE__} ruby -I path/to/reline/lib -I path/to/irb/lib path/to/irb/exe/irb | |
EOS | |
exit | |
end | |
command = ARGV | |
class Visualizer | |
def initialize(pty_output) | |
@pty_output = pty_output | |
@y, @x = STDIN.raw do | |
STDOUT.print "\e[6n" | |
STDIN.readpartial(1024)[/\e\[\d+;\d+R/].scan(/\d+/).map { _1.to_i - 1 } | |
end | |
@height, @width = STDIN.winsize | |
pty_output.winsize = [@height, @width] | |
@flashed = @height.times.map { {} } | |
@screen_lines = @height.times.map { [] } | |
@color_seq = [] | |
end | |
def move_cursor(x: @x, y: @y) | |
STDOUT.write "\e[#{y + 1};#{[x + 1, @width].min}H" | |
end | |
def scroll_down(n) | |
@y += n | |
if @y < @height | |
move_cursor | |
else | |
scroll = @y - @height + 1 | |
@y = @height - 1 | |
move_cursor | |
STDOUT.write "\n" * scroll | |
scroll.times do | |
@screen_lines.shift | |
@screen_lines << [] | |
@flashed.shift | |
@flashed << {} | |
end | |
end | |
end | |
FLASH_SEQ = [0,1,7] | |
FLASH_COUNT = 4 | |
TAB_WIDTH = 8 | |
def flash(c) | |
if @x == @width | |
@x = 0 | |
scroll_down(1) | |
end | |
@screen_lines[@y][@x] = [c, @color_seq] | |
@flashed[@y][@x] = FLASH_COUNT | |
draw(c, FLASH_SEQ) | |
end | |
def restore(force: false) | |
backup = @y, @x | |
@flashed.each_with_index do |cols, y| | |
next if cols.empty? | |
@y = y | |
cols.keys.sort.each do |x| | |
next if cols[x] != 1 && !force | |
c, color_seq = @screen_lines[y][x] | |
if c | |
@x = x | |
move_cursor | |
draw(c, color_seq) | |
end | |
end | |
cols.transform_values! { _1 - 1 }.delete_if { _2 == 0 } | |
end | |
@y, @x = backup | |
move_cursor | |
end | |
def draw(c, seq) | |
STDOUT.print "\e[0;#{seq.join(';')}m#{c}\e[0m" | |
end | |
def print(output) | |
output.gsub!(/\e\][^\a]*\a/, '') | |
sequences = output.split(/(\e\[[\x30-\x3f]*[\x20-\x2f]*[@a-zA-Z~]|\e[^\[])/) | |
# STDOUT.print "\e]2;#{output.inspect}\a" # debug | |
sequences.each_with_index do |seq, i| | |
if i % 2 == 0 | |
seq.grapheme_clusters.each do |c| | |
case c | |
when "\b" | |
if @x > 0 | |
@x -= 1 | |
move_cursor | |
end | |
when "\a" | |
STDOUT.write c | |
when "\r\n" | |
@x = 0 | |
scroll_down(1) | |
move_cursor | |
when "\r" | |
@x = 0 | |
move_cursor | |
when "\n" | |
scroll_down(1) | |
else | |
w = 1 # Reline::Unicode.calculate_width(c) | |
is_tab = c == "\t" | |
if is_tab | |
w = @x / TAB_WIDTH * TAB_WIDTH + TAB_WIDTH - @x | |
end | |
if @x + w > @width | |
@x = 0 | |
scroll_down(1) | |
end | |
if is_tab | |
w = TAB_WIDTH if @x == 0 | |
w.times { flash(' '); @x += 1 } | |
else | |
flash(c) | |
@x += w | |
end | |
move_cursor | |
end | |
end | |
else | |
type = seq[-1] | |
if seq =~ /\A\e(\[[?>=]?)/ | |
type = "#{$1}#{type}" | |
end | |
args = seq.scan(/\d+/).map(&:to_i) | |
case type | |
when 'M' | |
if @y == 0 | |
@screen_lines.pop | |
@screen_lines.unshift([]) | |
@flashed.pop | |
@flashed.unshift({}) | |
STDOUT.write seq | |
else | |
@y -= 1 | |
move_cursor | |
end | |
when '[A' | |
@y = [@y - (args[0] || 1), 0].max | |
move_cursor | |
when '[B' | |
@y = [@y + (args[0] || 1), @height - 1].min | |
move_cursor | |
when '[C' | |
@x = [@x + (args[0] || 1), @width - 1].min | |
move_cursor | |
when '[D' | |
@x = [@x - (args[0] || 1), 0].max | |
move_cursor | |
when '[G' | |
@x = ((args[0] || 1) - 1).clamp(0, @width - 1) | |
move_cursor | |
when '[K' | |
@screen_lines[@y].slice!(@x..) | |
STDOUT.write seq | |
when '[J' | |
@screen_lines[@y].slice!(@x..) | |
(@y + 1...@height).each { |y| @screen_lines[y].clear } | |
STDOUT.write seq | |
when '[H' | |
@y = (args[0] || 1) - 1 | |
@x = (args[1] || 1) - 1 | |
move_cursor | |
when '[P' | |
n = args[0] || 1 | |
@screen_lines[@y].slice!(@x, n) | |
@flashed[@y].transform_keys! { _1 <= @x ? _1 : [@x, _1 - n].min } | |
STDOUT.write seq | |
when '[@' | |
n = args[0] || 1 | |
@screen_lines[@y][@x, 0] = [[' ', @color_seq]] * n | |
@screen_lines[@y].slice!(@width..) | |
@flashed[@y].transform_keys! { _1 <= @x ? _1 : _1 + n }.delete_if { |k,| k >= @width } | |
STDOUT.write seq | |
when '[d' | |
@y = (args[0] || 1) - 1 | |
move_cursor | |
when '[m' | |
if args.empty? | |
@color_seq = [] | |
else | |
@color_seq = @color_seq.dup | |
args.each do |arg| | |
arg == 0 ? @color_seq = [] : @color_seq << arg | |
end | |
end | |
when '[n' | |
STDOUT.write seq | |
when '[?h' | |
if args[0] == 1049 | |
@backup = @screen_lines, @flashed, @x, @y | |
@screen_lines = @height.times.map { [] } | |
@flashed = @height.times.map { {} } | |
@x, @y = [0, 0] | |
end | |
STDOUT.write seq | |
when '[?l' | |
if args[0] == 1049 | |
@screen_lines, @flashed, @x, @y = @backup | |
end | |
STDOUT.write seq | |
when '=', '>' | |
# ignore | |
when '[>c', '[>m', '[r', '[t', 'P', '[?m', '[M', '[L' | |
# unimplemented, used in vi and emacs | |
STDOUT.write "\e]2;#{seq.inspect}\a" | |
STDOUT.write seq | |
else | |
raise "Unimplemented escape sequence: #{seq.inspect} #{output.inspect}" | |
end | |
end | |
end | |
end | |
def input(s) | |
s[/\e\[\d+;\d+R/]&.tap do |cursor_seq| | |
@y, @x = cursor_seq.scan(/\d+/).map { _1.to_i - 1 } | |
end | |
@pty_output.write s | |
end | |
def timer(winch) | |
puts 'WINCH not supported yet' if winch | |
restore | |
end | |
def exit | |
restore(force: true) | |
Kernel.exit | |
end | |
end | |
PTY.spawn command.join(' ') do |input, output, pid| | |
visualizer = Visualizer.new(output) | |
winch = false | |
Signal.trap(:WINCH) { winch = true } | |
Signal.trap(:INT) { Process.kill(:INT, pid) } | |
queue = Queue.new | |
Thread.new do | |
buf = +'' | |
loop do | |
if input.wait_readable(0.2) | |
buf << input.readpartial(1024) | |
next unless buf.valid_encoding? | |
if (pos = buf =~ /\e[^\ea-zA-Z]*\z/) | |
data = buf[0, pos] | |
buf = buf[pos..] | |
else | |
data = buf.dup | |
buf.clear | |
end | |
queue << [:print, data.force_encoding('utf-8')] | |
end | |
end | |
rescue | |
queue << :exit | |
end | |
Thread.new do | |
loop do | |
queue << [:timer, winch] | |
winch = false | |
sleep 0.05 | |
end | |
end | |
Thread.new do | |
STDIN.raw do | |
loop do | |
if STDIN.wait_readable(0.2) | |
queue << [:input, STDIN.readpartial(1024)] | |
end | |
end | |
end | |
end | |
loop do | |
event, *args = queue.deq | |
visualizer.send(event, *args) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment