Skip to content

Instantly share code, notes, and snippets.

@snuffyDev
Created March 21, 2022 22:28
Show Gist options
  • Save snuffyDev/404dbf6c2fa437cbf3d828a095af3928 to your computer and use it in GitHub Desktop.
Save snuffyDev/404dbf6c2fa437cbf3d828a095af3928 to your computer and use it in GitHub Desktop.
pub mod terminal;
use futures_util::{SinkExt, StreamExt};
use std::net::SocketAddr;
use std::process::Stdio;
use std::thread::sleep;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::process::{Child, Command};
use tokio::sync::mpsc::{channel, error::TryRecvError, Receiver, Sender};
// use tokio::sync::oneshot::{channel, Receiver, Sender};
use tokio_tungstenite::tungstenite::Result;
use tokio_tungstenite::{accept_async, tungstenite::Error, tungstenite::Message};
async fn process_pty(child: &mut Child, sender: Sender<String>, mut receiver: Receiver<String>) {
let mut stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
tokio::spawn(async move {
let mut f = BufReader::new(stdout);
loop {
match receiver.try_recv() {
Ok(line) => {
let input = stdin.write_all(&mut line.as_bytes()).await.unwrap();
let flushed_input = stdin.flush().await.unwrap();
drop(input);
}
Err(TryRecvError::Empty) => {
sleep(std::time::Duration::from_secs(1));
continue;
}
Err(e) => {
println!("Error! {:?}", e);
}
}
let mut buf = [0; 1024];
let mut buffer = String::new();
match f.read(&mut buf).await {
Ok(n) => {
let mut socket_part = String::new();
for val in buf {
socket_part.push(val as char);
}
socket_part.truncate(n);
buffer.push_str(&socket_part.as_str());
println!("socket-part: {:?}", socket_part);
sender.send(socket_part.to_string()).await.unwrap();
continue;
}
Err(e) => {
println!("Send Error! {:?}", e);
break;
}
};
}
});
}
async fn start_pty(
sender: Sender<String>,
mut receiver: Receiver<String>,
) -> tokio::process::Child {
// let pty = Pty::new().await;
let mut pty = Command::new("pwsh")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("Error spawning process");
process_pty(&mut pty, sender, receiver).await;
println!("PTY Started: {:?}", pty.id());
pty
}
async fn accept_connection(peer: SocketAddr, stream: TcpStream) {
if let Err(e) = handle_connection(peer, stream).await {
match e {
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
err => println!("Error processing connection: {}", err),
}
}
}
async fn handle_connection(peer: SocketAddr, stream: TcpStream) -> Result<()> {
let (tx1, mut rx1): (Sender<String>, Receiver<String>) = channel(1);
let (tx2, rx2): (Sender<String>, Receiver<String>) = channel(1);
let mut child = start_pty(tx1, rx2).await;
let mut ws_stream = accept_async(stream).await.expect("Failed to accept");
println!("New WebSocket connection: {}", peer);
while let Some(msg) = ws_stream.next().await {
let msg = msg?;
println!("message: {}", &msg);
tx2.send(msg.clone().to_string()).await.unwrap();
if msg.is_text() || msg.is_binary() {
while let Ok(line) = rx1.try_recv() {
println!("Terminal Message: {}", line);
ws_stream
.send(Message::Text(line.to_string()))
.await
.expect("uh oh");
}
}
}
Ok(())
}
#[tokio::main]
async fn main() {
let addr = "127.0.0.1:9002";
let listener = TcpListener::bind(&addr)
.await
.expect("Can't listen to provided address");
println!("Listening on: {}", addr);
while let Ok((stream, _)) = listener.accept().await {
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
println!("Peer address: {}", peer);
tokio::spawn(accept_connection(peer, stream));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment