Skip to content

Instantly share code, notes, and snippets.

@optman
Last active June 25, 2020 12:23
Show Gist options
  • Save optman/baa402707275cb0e948af99a2e75bd3f to your computer and use it in GitHub Desktop.
Save optman/baa402707275cb0e948af99a2e75bd3f to your computer and use it in GitHub Desktop.
port forwarding with yamux (Rust)
use futures::future;
use futures::io::{self, AsyncReadExt, AsyncWriteExt};
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*;
use async_std::task;
use std::sync::Arc;
use yamux;
use clap::{Arg, App};
#[async_std::main]
async fn main() -> io::Result<()>{
let matches = App::new("ytun")
.version("1.0")
.arg(Arg::with_name("local-addr")
.short("l")
.value_name("LOCAL-ADDR")
.required(true)
.help("local listen addr"))
.arg(Arg::with_name("remote-addr")
.short("r")
.value_name("REMOTE-ADDR")
.help("remote server addr"))
.arg(Arg::with_name("forward-addr")
.short("f")
.value_name("FORWARD-ADDR")
.help("forward to addr"))
.get_matches();
let local_addr = matches.value_of("local-addr").unwrap();
if let Some(remote_addr) = matches.value_of("remote-addr"){
client(local_addr, remote_addr).await?;
}else{
if let Some(forward_addr) = matches.value_of("forward-addr"){
server(local_addr, forward_addr).await?;
}else{
println!("no remote/forward addr specified!");
}
}
Ok(())
}
async fn client(local_addr : &str, remote_addr : &str) -> io::Result<()>{
println!("local: {:}, remote: {:}", local_addr, remote_addr);
let ln = TcpListener::bind(local_addr).await?;
let mut incoming = ln.incoming();
let mut muxer : Option<yamux::Control> = None;
while let Some(stream) = incoming.next().await{
if let Ok(src) = stream {
if muxer.is_none(){
if let Ok(m) = open_muxer(&remote_addr).await{
muxer.replace(m);
}else{
println!("open muxer fail");
continue;
}
}
if let Ok(dst) = muxer.as_mut().unwrap().open_stream().await{
task::spawn(concat_stream(src, dst));
}else{
println!("open stream fail");
muxer = None;
}
}
}
Ok(())
}
#[derive(Clone)]
struct Config{
local_addr: String,
forward_addr: String,
}
async fn server(local_addr : &str, forward_addr : &str) -> io::Result<()>{
println!("local: {:}, forward: {:}", local_addr, forward_addr);
let config = Arc::new(Config{ local_addr: local_addr.into(), forward_addr : forward_addr.into()});
listen(config, |config, conn| async move{
let conn = yamux::Connection::new(conn, Default::default(), yamux::Mode::Server);
yamux::into_stream(conn).for_each(|src| {
let config = config.clone();
task::spawn( async move {
if let Ok(src) = src{
match TcpStream::connect(config.forward_addr.clone()).await{
Ok(dst) => concat_stream(dst, src).await,
Err(err) => println!("forward fail {:}", err),
};
};
});
}).await;
}).await?;
Ok(())
}
async fn listen<F, T>(config :Arc<Config>, f : impl Fn(Arc<Config>, TcpStream) -> F) ->io::Result<()>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static
{
let ln = TcpListener::bind(config.local_addr.clone()).await?;
let mut incoming = ln.incoming();
while let Some(stream) = incoming.next().await{
if let Ok(src) = stream {
task::spawn(f(config.clone(), src));
}
}
Ok(())
}
async fn concat_stream(x : TcpStream, y : yamux::Stream){
let (mut src_reader, mut src_writer) = x.split();
let (mut dst_reader, mut dst_writer) = y.split();
let a= async {
let _ = io::copy(&mut src_reader, &mut dst_writer).await;
dst_writer.close().await
};
let b= async {
let _ = io::copy(&mut dst_reader, &mut src_writer).await;
src_writer.close().await
};
let _ = future::join(a, b).await;
}
async fn open_muxer(addr : &str) -> io::Result<yamux::Control>{
match TcpStream::connect(addr).await{
Ok(stream) =>{
let conn = yamux::Connection::new(stream, Default::default(), yamux::Mode::Client);
let control = conn.control();
task::spawn(yamux::into_stream(conn).for_each(|_| ()));
Ok(control)
},
Err(err) => {println!("connect remote server fail {:}", err); Err(err)}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment