Last active
January 16, 2022 21:09
-
-
Save divi255/e166673c5bc8cb833456a0acf6d951bf to your computer and use it in GitHub Desktop.
simple hyper RPC client/server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[macro_use] | |
extern crate bma_benchmark; | |
use hyper::{client::connect::HttpConnector, Body, Client, Method, Request, StatusCode}; | |
#[tokio::main] | |
async fn main() { | |
let iters = 1_000_000; | |
//let iters = 10; | |
let workers = 4; | |
let iters_per_worker = iters / workers; | |
let payload = vec![0xee; 10]; | |
let mut futs = Vec::new(); | |
staged_benchmark_start!("send.qos.processed"); | |
for _ in 0..workers { | |
let data = payload.clone(); | |
futs.push(tokio::spawn(async move { | |
let client: Client<HttpConnector> = Client::builder().http2_only(false).build_http(); | |
for _ in 0..iters_per_worker { | |
let req = Request::builder() | |
.method(Method::POST) | |
// a random header, just because HTTPs always have some | |
.header("content-type", "application/json") | |
.uri("http://127.0.0.1:9922/notify") | |
.body(Body::from(data.clone())) | |
.unwrap(); | |
let result = client.request(req).await.unwrap(); | |
assert_eq!(result.status(), StatusCode::NO_CONTENT); | |
} | |
})); | |
} | |
while let Some(f) = futs.pop() { | |
f.await.unwrap(); | |
} | |
staged_benchmark_finish_current!(iters); | |
futs.clear(); | |
staged_benchmark_start!("rpc.call"); | |
for _ in 0..workers { | |
let data = payload.clone(); | |
futs.push(tokio::spawn(async move { | |
let client: Client<HttpConnector> = Client::builder().http2_only(true).build_http(); | |
for _ in 0..iters_per_worker { | |
let req = Request::builder() | |
.method(Method::POST) | |
.header("content-type", "application/json") | |
.uri("http://127.0.0.1:9922/rpc/bmtest") | |
.body(Body::from(data.clone())) | |
.unwrap(); | |
let result = client.request(req).await.unwrap(); | |
assert_eq!(result.status(), StatusCode::OK); | |
assert_eq!(hyper::body::to_bytes(result).await.unwrap(), data); | |
} | |
})); | |
} | |
while let Some(f) = futs.pop() { | |
f.await.unwrap(); | |
} | |
staged_benchmark_finish_current!(iters); | |
staged_benchmark_print!(); | |
futs.clear(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[macro_use] | |
extern crate lazy_static; | |
use elbus::broker::{Broker, Client}; | |
use elbus::client::AsyncClient; | |
use elbus::QoS; | |
use hyper::service::{make_service_fn, service_fn}; | |
use hyper::{Body, Method, Request, Response, Server, StatusCode}; | |
use std::convert::Infallible; | |
use std::net::SocketAddr; | |
use tokio::sync::Mutex; | |
macro_rules! http_error { | |
($status: expr) => { | |
Ok(Response::builder() | |
.status($status) | |
.body(Body::from(String::new())) | |
.unwrap()) | |
}; | |
} | |
// using zero-copy payload in Hyper is tricky, so keep it small | |
const PAYLOAD_LEN: usize = 10; | |
/// method POST /notify does nothing, just checks the payload len | |
/// method POST /rpc/bmtest returns back the payload as-is | |
/// | |
/// Elbus broker calls can be uncommented, they don not affect results in any serious way | |
async fn handler(req: Request<Body>) -> Result<Response<Body>, Infallible> { | |
let (parts, body) = req.into_parts(); | |
macro_rules! get_body { | |
() => { | |
hyper::body::to_bytes(body).await.unwrap() | |
}; | |
} | |
if parts.method == Method::POST { | |
let path = parts.uri.path(); | |
if path == "/notify" { | |
let entire_body = get_body!(); | |
if entire_body.len() != PAYLOAD_LEN { | |
return http_error!(StatusCode::BAD_REQUEST); | |
} | |
//let _ = CLIENT | |
//.lock() | |
//.await | |
//.as_mut() | |
//.unwrap() | |
//.send("x", entire_body.to_vec().into(), QoS::No) | |
//.await; | |
Ok(Response::builder() | |
.status(StatusCode::NO_CONTENT) | |
.body(Body::from("")) | |
.unwrap()) | |
} else if let Some(method) = path.strip_prefix("/rpc/") { | |
if method == "bmtest" { | |
let entire_body = get_body!(); | |
if entire_body.len() != PAYLOAD_LEN { | |
return http_error!(StatusCode::BAD_REQUEST); | |
} | |
//let _ = CLIENT | |
//.lock() | |
//.await | |
//.as_mut() | |
//.unwrap() | |
//.send("x", entire_body.to_vec().into(), QoS::No) | |
//.await; | |
Ok(Response::builder().body(Body::from(entire_body)).unwrap()) | |
} else { | |
http_error!(StatusCode::METHOD_NOT_ALLOWED) | |
} | |
} else { | |
http_error!(StatusCode::NOT_FOUND) | |
} | |
} else { | |
http_error!(StatusCode::METHOD_NOT_ALLOWED) | |
} | |
} | |
lazy_static! { | |
static ref CLIENT: Mutex<Option<Client>> = Mutex::new(None); | |
} | |
fn main() { | |
let path = "0.0.0.0:9922"; | |
let workers = 4; | |
println!("Serving at {path}, {workers} workers"); | |
let rt = tokio::runtime::Builder::new_multi_thread() | |
.worker_threads(workers) | |
.enable_all() | |
.build() | |
.unwrap(); | |
rt.block_on(async move { | |
let broker = Broker::new(); | |
let client = broker.register_client("me").await.unwrap(); | |
CLIENT.lock().await.replace(client); | |
let addr: SocketAddr = path.parse().unwrap(); | |
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handler)) }); | |
loop { | |
let server = Server::bind(&addr).serve(make_svc); | |
let _r = server.await; | |
} | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment