Skip to content

Instantly share code, notes, and snippets.

@snuffyDev
Created March 31, 2022 12:08
Show Gist options
  • Select an option

  • Save snuffyDev/6c208d25b4ae5708ddfa72aa986970df to your computer and use it in GitHub Desktop.

Select an option

Save snuffyDev/6c208d25b4ae5708ddfa72aa986970df to your computer and use it in GitHub Desktop.
use bytes::BytesMut;
use env_logger;
use futures::{SinkExt, StreamExt};
use futures_util::stream::{SplitSink, SplitStream};
use log::{debug, error};
use serde_derive::Deserialize;
use std::{collections::HashMap, net::SocketAddr, process::Stdio};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
process::{Child, ChildStdin, ChildStdout, Command},
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
};
use tokio_tungstenite::{accept_async, WebSocketStream};
use tungstenite::Message;
pub async fn start_terminal(path: std::path::PathBuf) {
let _ = ws_server(&path)
.await
.map_err(|e| debug!("ws server exit with error: {:?}", e));
}
#[derive(Deserialize, Debug)]
struct WindowSize {
cols: u16,
rows: u16,
}
async fn handle_websocket_incoming(
mut incoming: SplitStream<WebSocketStream<TcpStream>>,
mut child_writer: ChildStdin,
websocket_sender: UnboundedSender<Message>,
stop_sender: UnboundedSender<()>,
) -> Result<(), anyhow::Error> {
while let Some(Ok(msg)) = incoming.next().await {
match msg {
Message::Binary(data) => match data[0] {
0 => {
if data.len().gt(&0) {
child_writer.write_all(&data[1..]).await?;
}
}
1 => {
let resize_msg: WindowSize = serde_json::from_slice(&data[1..])?;
}
2 => {
websocket_sender.send(Message::Binary(vec![1u8]))?;
}
_ => (),
},
Message::Close(_) => {
debug!("Closing!");
stop_sender.send(()).map_err(|e| debug!("stopping"));
}
Message::Ping(data) => websocket_sender.send(Message::Pong(data))?,
_ => (),
};
}
let _ = stop_sender
.send(())
.map_err(|e| debug!("failed to send stop signal: {:?}", e));
Ok(())
}
async fn handle_pty_incoming(
mut pty_shell_reader: ChildStdout,
websocket_sender: UnboundedSender<Message>,
) -> Result<(), anyhow::Error> {
let fut = async move {
let mut buffer = BytesMut::with_capacity(1024);
buffer.resize(1024, 0u8);
loop {
buffer[0] = 0u8;
let mut tail = &mut buffer[1..];
let n = pty_shell_reader.read_buf(&mut tail).await?;
if n == 0 {
break;
}
match websocket_sender.send(Message::Binary(buffer[..n + 1].to_vec())) {
Ok(_) => (),
Err(e) => anyhow::bail!("failed to send msg to client: {:?}", e),
}
}
Ok::<(), anyhow::Error>(())
};
fut.await.map_err(|e| {
log::error!("handle pty incoming error: {:?}", &e);
e
})
}
async fn write_to_websocket(
mut outgoing: SplitSink<WebSocketStream<TcpStream>, Message>,
mut receiver: UnboundedReceiver<Message>,
) -> Result<(), anyhow::Error> {
while let Some(msg) = receiver.recv().await {
outgoing.send(msg).await?;
}
Ok(())
}
async fn kill_pty(
mut child: Child,
mut stop_recv: UnboundedReceiver<()>,
) -> Result<(), anyhow::Error> {
while let Some(signal) = stop_recv.recv().await {
child.kill().await?;
}
Ok(())
}
async fn handle_connection(
stream: TcpStream,
path: &std::path::PathBuf,
) -> Result<(), anyhow::Error> {
let ws_stream = accept_async(stream).await?;
let (ws_outgoing, ws_incoming) = ws_stream.split();
let (sender, receiver) = unbounded_channel();
let ws_sender = sender.clone();
let mut child = Command::new("pwsh")
.arg("-WindowStyle")
.arg("Hidden")
.current_dir(path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("Error spawning process");
println!("PTY Started: {:?}", child.id());
let mut stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let (stop_sender, stop_receiver) = unbounded_channel();
let res = tokio::select! {
res = handle_websocket_incoming(ws_incoming, stdin, sender, stop_sender) => res,
res = handle_pty_incoming(stdout, ws_sender) => res,
res = write_to_websocket(ws_outgoing, receiver) => res,
res = kill_pty(child, stop_receiver) => res,
};
log::debug!("res = {:?}", res);
Ok(())
}
async fn ws_server(path: &std::path::PathBuf) -> Result<(), anyhow::Error> {
let addr: SocketAddr = "127.0.0.1:9022".parse().unwrap();
match TcpListener::bind(addr).await {
Ok(listener) => {
while let Ok((stream, peer)) = listener.accept().await {
log::debug!("handling request from {:?}", peer);
let path = path.clone();
let fut = async move {
let _ = handle_connection(stream, &path)
.await
.map_err(|e| error!("handle connection error: {:?}", e));
};
tokio::spawn(fut);
}
}
Err(e) => return Err(anyhow::anyhow!("failed to listen: {:?}", e)),
}
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment