Created
November 13, 2023 08:40
-
-
Save TonioGela/32064e0d76069b66bf7e9ee60a72c765 to your computer and use it in GitHub Desktop.
Fancy JLine inspired way of handling single stdin keypresses on tty based terminals
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//> using dep co.fs2::fs2-io::3.9.3 | |
import fs2.* | |
import fs2.io.process.* | |
import cats.effect.* | |
import cats.syntax.all.* | |
import sun.misc.* | |
import cats.effect.std.Dispatcher | |
object Foo extends IOApp.Simple: | |
val run: IO[Unit] = | |
Dispatcher | |
.sequential[IO] | |
.use(dispatcher => | |
runWithOutput("/bin/stty", "-f", "/dev/tty", "-g").flatMap(prevSttyMode => | |
runCommand("/bin/stty", "-f", "/dev/tty", "raw", "-echo") >> // sets the console to raw mode | |
IO.print("\u001b[?;47;h") >> // alternate buffer mode | |
IO.print("\u001b[?25l") >> // hides cursor | |
IO.print("\u001b[2J") >> // clears whole screen | |
registerHandler(dispatcher) >> | |
fs2.io | |
.stdin[IO](1024) | |
.through(handleKeyPresses) | |
.onFinalize( | |
IO.print("\u001B[?;47;l") >> // restores normal buffer mode | |
IO.print("\u001B[?;25;h") >> // shows cursor | |
IO.print("\u001b[2J") >> // clears whole screen | |
runCommand("/bin/stty", "-f", "/dev/tty", prevSttyMode) // sets the console to the previous stty mode | |
) | |
.compile | |
.drain | |
) | |
) | |
def registerHandler(dispatcher: Dispatcher[IO]): IO[Unit] = IO.delay( | |
Signal.handle( | |
new Signal("WINCH"), // This signal is sent to the process each time the window is resized | |
new SignalHandler() { | |
def handle(signal: Signal): Unit = dispatcher.unsafeRunAndForget( | |
IO.println(s"\u001B[GYou resized the window") | |
) | |
} | |
) | |
) | |
def runCommand(command: String, args: String*): IO[Unit] = runWithOutput(command, args*).void | |
def runWithOutput(command: String, args: String*): IO[String] = | |
ProcessBuilder(command, args.toList) | |
.spawn[IO] | |
.use: p => | |
(p.exitValue, p.stdout.through(fs2.text.utf8.decode).compile.foldMonoid).flatMapN: | |
case (0, stdout) => stdout.pure[IO] | |
case (code, _) => IO.raiseError(new Exception(s"$command ${args.mkString(" ")} terminated with error $code")) | |
def handleKeyPresses(stream: Stream[IO, Byte]): Stream[IO, Unit] = | |
def loop(s: Stream[IO, Byte]): Pull[IO, Nothing, Unit] = s.pull.uncons1.flatMap: | |
case None => Pull.done | |
case Some((3, _)) => Pull.done // 3 is CTRL+C, you have to EXPLICTLY manage it | |
case Some((b, rest)) => | |
Pull.eval(IO.println(s"\u001B[GI received byte: %04d".format(b))) >> loop( | |
rest | |
) // ESC[G is "Beginning of the line" | |
loop(stream).stream |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment