Skip to content

Instantly share code, notes, and snippets.

@divi255
Last active January 16, 2022 21:09
Show Gist options
  • Save divi255/e166673c5bc8cb833456a0acf6d951bf to your computer and use it in GitHub Desktop.
Save divi255/e166673c5bc8cb833456a0acf6d951bf to your computer and use it in GitHub Desktop.
simple hyper RPC client/server
#[macro_use]
extern crate bma_benchmark;
use hyper::{client::connect::HttpConnector, Body, Client, Method, Request, StatusCode};
#[tokio::main]
async fn main() {
let iters = 1_000_000;
//let iters = 10;
let workers = 4;
let iters_per_worker = iters / workers;
let payload = vec![0xee; 10];
let mut futs = Vec::new();
staged_benchmark_start!("send.qos.processed");
for _ in 0..workers {
let data = payload.clone();
futs.push(tokio::spawn(async move {
let client: Client<HttpConnector> = Client::builder().http2_only(false).build_http();
for _ in 0..iters_per_worker {
let req = Request::builder()
.method(Method::POST)
// a random header, just because HTTPs always have some
.header("content-type", "application/json")
.uri("http://127.0.0.1:9922/notify")
.body(Body::from(data.clone()))
.unwrap();
let result = client.request(req).await.unwrap();
assert_eq!(result.status(), StatusCode::NO_CONTENT);
}
}));
}
while let Some(f) = futs.pop() {
f.await.unwrap();
}
staged_benchmark_finish_current!(iters);
futs.clear();
staged_benchmark_start!("rpc.call");
for _ in 0..workers {
let data = payload.clone();
futs.push(tokio::spawn(async move {
let client: Client<HttpConnector> = Client::builder().http2_only(true).build_http();
for _ in 0..iters_per_worker {
let req = Request::builder()
.method(Method::POST)
.header("content-type", "application/json")
.uri("http://127.0.0.1:9922/rpc/bmtest")
.body(Body::from(data.clone()))
.unwrap();
let result = client.request(req).await.unwrap();
assert_eq!(result.status(), StatusCode::OK);
assert_eq!(hyper::body::to_bytes(result).await.unwrap(), data);
}
}));
}
while let Some(f) = futs.pop() {
f.await.unwrap();
}
staged_benchmark_finish_current!(iters);
staged_benchmark_print!();
futs.clear();
}
#[macro_use]
extern crate lazy_static;
use elbus::broker::{Broker, Client};
use elbus::client::AsyncClient;
use elbus::QoS;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use std::convert::Infallible;
use std::net::SocketAddr;
use tokio::sync::Mutex;
macro_rules! http_error {
($status: expr) => {
Ok(Response::builder()
.status($status)
.body(Body::from(String::new()))
.unwrap())
};
}
// using zero-copy payload in Hyper is tricky, so keep it small
const PAYLOAD_LEN: usize = 10;
/// method POST /notify does nothing, just checks the payload len
/// method POST /rpc/bmtest returns back the payload as-is
///
/// Elbus broker calls can be uncommented, they don not affect results in any serious way
async fn handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
let (parts, body) = req.into_parts();
macro_rules! get_body {
() => {
hyper::body::to_bytes(body).await.unwrap()
};
}
if parts.method == Method::POST {
let path = parts.uri.path();
if path == "/notify" {
let entire_body = get_body!();
if entire_body.len() != PAYLOAD_LEN {
return http_error!(StatusCode::BAD_REQUEST);
}
//let _ = CLIENT
//.lock()
//.await
//.as_mut()
//.unwrap()
//.send("x", entire_body.to_vec().into(), QoS::No)
//.await;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::from(""))
.unwrap())
} else if let Some(method) = path.strip_prefix("/rpc/") {
if method == "bmtest" {
let entire_body = get_body!();
if entire_body.len() != PAYLOAD_LEN {
return http_error!(StatusCode::BAD_REQUEST);
}
//let _ = CLIENT
//.lock()
//.await
//.as_mut()
//.unwrap()
//.send("x", entire_body.to_vec().into(), QoS::No)
//.await;
Ok(Response::builder().body(Body::from(entire_body)).unwrap())
} else {
http_error!(StatusCode::METHOD_NOT_ALLOWED)
}
} else {
http_error!(StatusCode::NOT_FOUND)
}
} else {
http_error!(StatusCode::METHOD_NOT_ALLOWED)
}
}
lazy_static! {
static ref CLIENT: Mutex<Option<Client>> = Mutex::new(None);
}
fn main() {
let path = "0.0.0.0:9922";
let workers = 4;
println!("Serving at {path}, {workers} workers");
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(workers)
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
let broker = Broker::new();
let client = broker.register_client("me").await.unwrap();
CLIENT.lock().await.replace(client);
let addr: SocketAddr = path.parse().unwrap();
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handler)) });
loop {
let server = Server::bind(&addr).serve(make_svc);
let _r = server.await;
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment