Skip to content

Instantly share code, notes, and snippets.

@neonphog
Created February 11, 2020 17:44
Show Gist options
  • Save neonphog/127d573685e102ad552520926f4e75ae to your computer and use it in GitHub Desktop.
Save neonphog/127d573685e102ad552520926f4e75ae to your computer and use it in GitHub Desktop.
tokio-quinn-poc.rs
// [dependencies]
// env_logger = "0.7"
// log = "0.4"
// num_cpus ="1"
// quinn-proto = "0.5"
// quinn = "0.5"
// rcgen = "0.7"
// rustls = { version = "0.16", features = ["dangerous_configuration"] }
// tokio = { version = "0.2", features = ["full"] }
// webpki = "0.21"
//
// Running this should give something like:
//
//
// [2020-02-11T17:42:55Z INFO tokio_quic] server got data: 18 bytes - Hello Test Message
// [2020-02-11T17:42:55Z INFO tokio_quic] client got data: 24 bytes - echo: Hello Test Message
struct OpenVerifier;
impl rustls::ServerCertVerifier for OpenVerifier {
fn verify_server_cert(&self,
_roots: &rustls::RootCertStore,
_presented_certs: &[rustls::Certificate],
_dns_name: webpki::DNSNameRef,
_ocsp_response: &[u8],
) -> Result<rustls:: ServerCertVerified, rustls::TLSError> {
Ok(rustls::ServerCertVerified::assertion())
}
}
use log::*;
use std::sync::Arc;
use std::net::ToSocketAddrs;
use tokio::stream::StreamExt;
const ALPN_QUIC_HTTP: &[&[u8]] = &[b"hq-24"];
async fn async_main() {
let mut transport_config = quinn::TransportConfig::default();
transport_config.stream_window_uni = 0;
let mut server_config = quinn::ServerConfig::default();
server_config.transport = Arc::new(transport_config);
let mut server_config = quinn::ServerConfigBuilder::new(server_config);
server_config.protocols(ALPN_QUIC_HTTP);
server_config.enable_keylog();
server_config.use_stateless_retry(true);
let cert = rcgen::generate_simple_self_signed(vec![
"localhost".into(),
]).unwrap();
let key = cert.serialize_private_key_der();
let cert = cert.serialize_der().unwrap();
let key = quinn::PrivateKey::from_der(&key).unwrap();
let cert = quinn::Certificate::from_der(&cert).unwrap();
server_config.certificate(quinn::CertificateChain::from_certs(vec![
cert.clone(),
]), key).unwrap();
let mut endpoint = quinn::Endpoint::builder();
endpoint.listen(server_config.build());
let listen = "localhost:0".to_socket_addrs().unwrap().next().unwrap();
let (incoming, bound_addr) = {
let (driver, endpoint, incoming) = endpoint.bind(&listen).unwrap();
tokio::task::spawn(driver);
let bound_addr = endpoint.local_addr().unwrap();
(incoming, bound_addr)
};
info!("listening at {}", bound_addr);
tokio::task::spawn(handle_listen(incoming));
let mut endpoint = quinn::Endpoint::builder();
let mut client_config = quinn::ClientConfigBuilder::default();
client_config.protocols(ALPN_QUIC_HTTP);
client_config.enable_keylog();
//only needed if we keep the default ServerCertVerifier
//client_config.add_certificate_authority(cert).unwrap();
let mut client_config = client_config.build();
Arc::make_mut(&mut client_config.crypto)
.dangerous()
.set_certificate_verifier(Arc::new(OpenVerifier));
endpoint.default_client_config(client_config);
let endpoint = {
let (driver, endpoint, _) = endpoint.bind(&listen).unwrap();
tokio::task::spawn(driver);
endpoint
};
let quinn::NewConnection {
driver,
connection: con,
..
} = endpoint.connect(&bound_addr, "localhost").unwrap().await.unwrap();
tokio::task::spawn(driver);
let (mut send, mut recv) = con.open_bi().await.unwrap();
send.write_all(b"Hello Test Message").await.unwrap();
let mut buffer: [u8; 4096] = [0_u8; 4096];
let size = recv.read(&mut buffer).await.unwrap().unwrap();
let data = String::from_utf8_lossy(&buffer[..size]);
info!("client got data: {} bytes - {}", size, data);
con.close(0_u32.into(), b"done");
}
async fn handle_listen(mut incoming: quinn::Incoming) {
while let Some(con) = incoming.next().await {
tokio::task::spawn(handle_connecting(con));
}
}
async fn handle_connecting(connecting: quinn::Connecting) {
let quinn::NewConnection {
driver,
connection: con,
mut bi_streams,
..
} = connecting.await.unwrap();
tokio::task::spawn(driver);
info!("received connection from {}", con.remote_address());
while let Some(stream) = bi_streams.next().await {
match stream {
Ok(s) => {
tokio::task::spawn(handle_stream(s));
}
e @ _ => panic!(e),
}
}
}
async fn handle_stream(
(mut send, mut recv): (quinn::SendStream, quinn::RecvStream),
) {
loop {
let mut buffer: [u8; 4096] = [0_u8; 4096];
let size = recv.read(&mut buffer).await.unwrap().unwrap();
let data = String::from_utf8_lossy(&buffer[..size]);
info!("server got data: {} bytes - {}", size, data);
let resp = format!("echo: {}", data);
send.write_all(resp.as_bytes()).await.unwrap();
}
}
fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "trace");
}
let _ = env_logger::builder().try_init();
tokio::runtime::Builder::new()
.threaded_scheduler()
.core_threads(num_cpus::get())
.thread_name("tokio-quinn-worker-thread")
.enable_all()
.build()
.unwrap()
.block_on(async_main());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment