Created
March 31, 2022 12:08
-
-
Save snuffyDev/6c208d25b4ae5708ddfa72aa986970df to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use bytes::BytesMut; | |
| use env_logger; | |
| use futures::{SinkExt, StreamExt}; | |
| use futures_util::stream::{SplitSink, SplitStream}; | |
| use log::{debug, error}; | |
| use serde_derive::Deserialize; | |
| use std::{collections::HashMap, net::SocketAddr, process::Stdio}; | |
| use tokio::{ | |
| io::{AsyncReadExt, AsyncWriteExt}, | |
| net::{TcpListener, TcpStream}, | |
| process::{Child, ChildStdin, ChildStdout, Command}, | |
| sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, | |
| }; | |
| use tokio_tungstenite::{accept_async, WebSocketStream}; | |
| use tungstenite::Message; | |
| pub async fn start_terminal(path: std::path::PathBuf) { | |
| let _ = ws_server(&path) | |
| .await | |
| .map_err(|e| debug!("ws server exit with error: {:?}", e)); | |
| } | |
| #[derive(Deserialize, Debug)] | |
| struct WindowSize { | |
| cols: u16, | |
| rows: u16, | |
| } | |
| async fn handle_websocket_incoming( | |
| mut incoming: SplitStream<WebSocketStream<TcpStream>>, | |
| mut child_writer: ChildStdin, | |
| websocket_sender: UnboundedSender<Message>, | |
| stop_sender: UnboundedSender<()>, | |
| ) -> Result<(), anyhow::Error> { | |
| while let Some(Ok(msg)) = incoming.next().await { | |
| match msg { | |
| Message::Binary(data) => match data[0] { | |
| 0 => { | |
| if data.len().gt(&0) { | |
| child_writer.write_all(&data[1..]).await?; | |
| } | |
| } | |
| 1 => { | |
| let resize_msg: WindowSize = serde_json::from_slice(&data[1..])?; | |
| } | |
| 2 => { | |
| websocket_sender.send(Message::Binary(vec![1u8]))?; | |
| } | |
| _ => (), | |
| }, | |
| Message::Close(_) => { | |
| debug!("Closing!"); | |
| stop_sender.send(()).map_err(|e| debug!("stopping")); | |
| } | |
| Message::Ping(data) => websocket_sender.send(Message::Pong(data))?, | |
| _ => (), | |
| }; | |
| } | |
| let _ = stop_sender | |
| .send(()) | |
| .map_err(|e| debug!("failed to send stop signal: {:?}", e)); | |
| Ok(()) | |
| } | |
| async fn handle_pty_incoming( | |
| mut pty_shell_reader: ChildStdout, | |
| websocket_sender: UnboundedSender<Message>, | |
| ) -> Result<(), anyhow::Error> { | |
| let fut = async move { | |
| let mut buffer = BytesMut::with_capacity(1024); | |
| buffer.resize(1024, 0u8); | |
| loop { | |
| buffer[0] = 0u8; | |
| let mut tail = &mut buffer[1..]; | |
| let n = pty_shell_reader.read_buf(&mut tail).await?; | |
| if n == 0 { | |
| break; | |
| } | |
| match websocket_sender.send(Message::Binary(buffer[..n + 1].to_vec())) { | |
| Ok(_) => (), | |
| Err(e) => anyhow::bail!("failed to send msg to client: {:?}", e), | |
| } | |
| } | |
| Ok::<(), anyhow::Error>(()) | |
| }; | |
| fut.await.map_err(|e| { | |
| log::error!("handle pty incoming error: {:?}", &e); | |
| e | |
| }) | |
| } | |
| async fn write_to_websocket( | |
| mut outgoing: SplitSink<WebSocketStream<TcpStream>, Message>, | |
| mut receiver: UnboundedReceiver<Message>, | |
| ) -> Result<(), anyhow::Error> { | |
| while let Some(msg) = receiver.recv().await { | |
| outgoing.send(msg).await?; | |
| } | |
| Ok(()) | |
| } | |
| async fn kill_pty( | |
| mut child: Child, | |
| mut stop_recv: UnboundedReceiver<()>, | |
| ) -> Result<(), anyhow::Error> { | |
| while let Some(signal) = stop_recv.recv().await { | |
| child.kill().await?; | |
| } | |
| Ok(()) | |
| } | |
| async fn handle_connection( | |
| stream: TcpStream, | |
| path: &std::path::PathBuf, | |
| ) -> Result<(), anyhow::Error> { | |
| let ws_stream = accept_async(stream).await?; | |
| let (ws_outgoing, ws_incoming) = ws_stream.split(); | |
| let (sender, receiver) = unbounded_channel(); | |
| let ws_sender = sender.clone(); | |
| let mut child = Command::new("pwsh") | |
| .arg("-WindowStyle") | |
| .arg("Hidden") | |
| .current_dir(path) | |
| .stdin(Stdio::piped()) | |
| .stdout(Stdio::piped()) | |
| .spawn() | |
| .expect("Error spawning process"); | |
| println!("PTY Started: {:?}", child.id()); | |
| let mut stdin = child.stdin.take().unwrap(); | |
| let stdout = child.stdout.take().unwrap(); | |
| let (stop_sender, stop_receiver) = unbounded_channel(); | |
| let res = tokio::select! { | |
| res = handle_websocket_incoming(ws_incoming, stdin, sender, stop_sender) => res, | |
| res = handle_pty_incoming(stdout, ws_sender) => res, | |
| res = write_to_websocket(ws_outgoing, receiver) => res, | |
| res = kill_pty(child, stop_receiver) => res, | |
| }; | |
| log::debug!("res = {:?}", res); | |
| Ok(()) | |
| } | |
| async fn ws_server(path: &std::path::PathBuf) -> Result<(), anyhow::Error> { | |
| let addr: SocketAddr = "127.0.0.1:9022".parse().unwrap(); | |
| match TcpListener::bind(addr).await { | |
| Ok(listener) => { | |
| while let Ok((stream, peer)) = listener.accept().await { | |
| log::debug!("handling request from {:?}", peer); | |
| let path = path.clone(); | |
| let fut = async move { | |
| let _ = handle_connection(stream, &path) | |
| .await | |
| .map_err(|e| error!("handle connection error: {:?}", e)); | |
| }; | |
| tokio::spawn(fut); | |
| } | |
| } | |
| Err(e) => return Err(anyhow::anyhow!("failed to listen: {:?}", e)), | |
| } | |
| Ok(()) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment