Skip to content

Instantly share code, notes, and snippets.

@snuffyDev
Created March 21, 2022 09:41
Show Gist options
  • Save snuffyDev/4692e4652fa4ef1298e7dc3b2eb33187 to your computer and use it in GitHub Desktop.
Save snuffyDev/4692e4652fa4ef1298e7dc3b2eb33187 to your computer and use it in GitHub Desktop.
pub mod terminal;
use futures_util::{SinkExt, StreamExt};
use std::net::SocketAddr;
use std::process::Stdio;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::sync::Arc;
use std::thread::{self, sleep};
use terminal::{Pty, PtyProcess};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
// use tokio::sync::oneshot::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio_tungstenite::tungstenite::Result;
use tokio_tungstenite::{accept_async, tungstenite::Error, tungstenite::Message};
async fn process_pty(child: &mut Child, sender: Sender<String>, receiver: Receiver<String>) {
let mut stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
thread::spawn(move || {
let mut f = BufReader::new(stdout);
loop {
match receiver.try_recv() {
Ok(line) => {
let result = stdin.write_all(&mut line.as_bytes());
}
Err(TryRecvError::Empty) => {
sleep(std::time::Duration::from_secs(1));
continue;
}
Err(e) => {
println!("Error! {:?}", e);
}
}
let mut buf = String::new();
match f.read_line(&mut buf) {
Ok(_) => {
sender.send(buf).unwrap();
continue;
}
Err(e) => {
println!("Send Error! {:?}", e);
break;
}
}
}
});
}
async fn start_pty(sender: Sender<String>, receiver: Receiver<String>) {
// let pty = Pty::new().await;
let pty = Command::new("pwsh")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("Error spawning process");
// let process = pty..await;
tokio::spawn(async move {
let recv = receiver;
let mut reader = BufReader::new(pty.stdout.unwrap());
let mut stdin = pty.stdin.unwrap();
for line in recv {
stdin.write_all(line.as_bytes()).await.unwrap();
let mut buf = String::new();
match reader.read_line(&mut buf).await {
Ok(_) => {
sender.send(buf).unwrap();
continue;
}
Err(e) => {
println!("Error reading line: ${:?}", e);
break;
}
}
}
});
}
fn send_cmd(cmd: String, mutex: Mutex<Sender<String>>) {
thread::spawn(move || {
let sender = mutex.lock();
let sender = async { sender.await };
sender.send(cmd).unwrap();
});
}
async fn accept_connection(peer: SocketAddr, stream: TcpStream) {
if let Err(e) = handle_connection(peer, stream).await {
match e {
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
err => println!("Error processing connection: {}", err),
}
}
}
async fn handle_connection(peer: SocketAddr, stream: TcpStream) -> Result<()> {
let (tx1, rx1): (Sender<String>, Receiver<String>) = channel();
let (tx2, rx2): (Sender<String>, Receiver<String>) = channel();
start_pty(tx1, rx2).await;
let mut ws_stream = accept_async(stream).await.expect("Failed to accept");
println!("New WebSocket connection: {}", peer);
while let Some(msg) = ws_stream.next().await {
let msg = msg?;
println!("message: {}", &msg);
// #### HERE
if msg.is_text() || msg.is_binary() {
// let stdout = term.stdout.take().expect("Failed to open stdout");
send_cmd(msg.to_string(), Mutex::new(tx2.clone())).await;
for line in &rx1 {
println!("Terminal Message: {}", line);
}
ws_stream
.send(Message::Text(msg.to_string()))
.await
.expect("uh oh");
}
}
Ok(())
}
#[tokio::main]
async fn main() {
// env_logger::init();
let addr = "127.0.0.1:9002";
let listener = TcpListener::bind(&addr)
.await
.expect("Can't listen to provided address");
println!("Listening on: {}", addr);
while let Ok((stream, _)) = listener.accept().await {
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
println!("Peer address: {}", peer);
tokio::spawn(accept_connection(peer, stream));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment