Last active
June 25, 2020 12:23
-
-
Save optman/baa402707275cb0e948af99a2e75bd3f to your computer and use it in GitHub Desktop.
port forwarding with yamux (Rust)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::future; | |
use futures::io::{self, AsyncReadExt, AsyncWriteExt}; | |
use async_std::net::{TcpListener, TcpStream}; | |
use async_std::prelude::*; | |
use async_std::task; | |
use std::sync::Arc; | |
use yamux; | |
use clap::{Arg, App}; | |
#[async_std::main] | |
async fn main() -> io::Result<()>{ | |
let matches = App::new("ytun") | |
.version("1.0") | |
.arg(Arg::with_name("local-addr") | |
.short("l") | |
.value_name("LOCAL-ADDR") | |
.required(true) | |
.help("local listen addr")) | |
.arg(Arg::with_name("remote-addr") | |
.short("r") | |
.value_name("REMOTE-ADDR") | |
.help("remote server addr")) | |
.arg(Arg::with_name("forward-addr") | |
.short("f") | |
.value_name("FORWARD-ADDR") | |
.help("forward to addr")) | |
.get_matches(); | |
let local_addr = matches.value_of("local-addr").unwrap(); | |
if let Some(remote_addr) = matches.value_of("remote-addr"){ | |
client(local_addr, remote_addr).await?; | |
}else{ | |
if let Some(forward_addr) = matches.value_of("forward-addr"){ | |
server(local_addr, forward_addr).await?; | |
}else{ | |
println!("no remote/forward addr specified!"); | |
} | |
} | |
Ok(()) | |
} | |
async fn client(local_addr : &str, remote_addr : &str) -> io::Result<()>{ | |
println!("local: {:}, remote: {:}", local_addr, remote_addr); | |
let ln = TcpListener::bind(local_addr).await?; | |
let mut incoming = ln.incoming(); | |
let mut muxer : Option<yamux::Control> = None; | |
while let Some(stream) = incoming.next().await{ | |
if let Ok(src) = stream { | |
if muxer.is_none(){ | |
if let Ok(m) = open_muxer(&remote_addr).await{ | |
muxer.replace(m); | |
}else{ | |
println!("open muxer fail"); | |
continue; | |
} | |
} | |
if let Ok(dst) = muxer.as_mut().unwrap().open_stream().await{ | |
task::spawn(concat_stream(src, dst)); | |
}else{ | |
println!("open stream fail"); | |
muxer = None; | |
} | |
} | |
} | |
Ok(()) | |
} | |
#[derive(Clone)] | |
struct Config{ | |
local_addr: String, | |
forward_addr: String, | |
} | |
async fn server(local_addr : &str, forward_addr : &str) -> io::Result<()>{ | |
println!("local: {:}, forward: {:}", local_addr, forward_addr); | |
let config = Arc::new(Config{ local_addr: local_addr.into(), forward_addr : forward_addr.into()}); | |
listen(config, |config, conn| async move{ | |
let conn = yamux::Connection::new(conn, Default::default(), yamux::Mode::Server); | |
yamux::into_stream(conn).for_each(|src| { | |
let config = config.clone(); | |
task::spawn( async move { | |
if let Ok(src) = src{ | |
match TcpStream::connect(config.forward_addr.clone()).await{ | |
Ok(dst) => concat_stream(dst, src).await, | |
Err(err) => println!("forward fail {:}", err), | |
}; | |
}; | |
}); | |
}).await; | |
}).await?; | |
Ok(()) | |
} | |
async fn listen<F, T>(config :Arc<Config>, f : impl Fn(Arc<Config>, TcpStream) -> F) ->io::Result<()> | |
where | |
F: Future<Output = T> + Send + 'static, | |
T: Send + 'static | |
{ | |
let ln = TcpListener::bind(config.local_addr.clone()).await?; | |
let mut incoming = ln.incoming(); | |
while let Some(stream) = incoming.next().await{ | |
if let Ok(src) = stream { | |
task::spawn(f(config.clone(), src)); | |
} | |
} | |
Ok(()) | |
} | |
async fn concat_stream(x : TcpStream, y : yamux::Stream){ | |
let (mut src_reader, mut src_writer) = x.split(); | |
let (mut dst_reader, mut dst_writer) = y.split(); | |
let a= async { | |
let _ = io::copy(&mut src_reader, &mut dst_writer).await; | |
dst_writer.close().await | |
}; | |
let b= async { | |
let _ = io::copy(&mut dst_reader, &mut src_writer).await; | |
src_writer.close().await | |
}; | |
let _ = future::join(a, b).await; | |
} | |
async fn open_muxer(addr : &str) -> io::Result<yamux::Control>{ | |
match TcpStream::connect(addr).await{ | |
Ok(stream) =>{ | |
let conn = yamux::Connection::new(stream, Default::default(), yamux::Mode::Client); | |
let control = conn.control(); | |
task::spawn(yamux::into_stream(conn).for_each(|_| ())); | |
Ok(control) | |
}, | |
Err(err) => {println!("connect remote server fail {:}", err); Err(err)} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment