Skip to content

Instantly share code, notes, and snippets.

@divi255
Last active January 18, 2022 01:17
Show Gist options
  • Save divi255/683ae8e9c0c63e34f49e058aeb536349 to your computer and use it in GitHub Desktop.
Save divi255/683ae8e9c0c63e34f49e058aeb536349 to your computer and use it in GitHub Desktop.
nats-elbus
/* [dependencies]
tokio = { version = "1.15.0", features = ["full"] }
bma-benchmark = "0.0.18"
async-nats = "0.10.1"
elbus = { version = "", features = ["full"] }
*/
#[macro_use]
extern crate bma_benchmark;
use elbus::client::AsyncClient;
use elbus::ipc::{Client, Config};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::time::sleep;
use elbus::QoS;
static COUNTER: AtomicUsize = AtomicUsize::new(0);
#[tokio::main]
async fn main() {
let nats_path = "localhost:4222";
let elbus_path = "localhost:9924";
//let elbus_path = "/tmp/elbus.sock";
let iters = 1_000_000;
let workers = 2;
let iters_per_worker = iters / workers;
let payload = vec![0xee; 100];
let mut subs = Vec::new();
let mut pubs = Vec::new();
COUNTER.store(0, Ordering::SeqCst);
for i in 0..workers {
let data = payload.clone();
subs.push(tokio::spawn(async move {
let topic = format!("test{}", i);
let nc = async_nats::connect(nats_path).await.unwrap();
let sub = nc.subscribe(&topic).await.unwrap();
for _ in 0..iters_per_worker {
let msg = sub.next().await.unwrap();
assert_eq!(msg.data, data);
COUNTER.fetch_add(1, Ordering::SeqCst);
}
}));
}
sleep(Duration::from_secs(1)).await;
staged_benchmark_start!("nats.pubsub");
for i in 0..workers {
let data = payload.clone();
pubs.push(tokio::spawn(async move {
let topic = format!("test{}", i);
let nc = async_nats::connect(nats_path).await.unwrap();
loop {
let _ = nc.publish(&topic, &data).await;
}
}));
}
while COUNTER.load(Ordering::SeqCst) < iters {
sleep(Duration::from_millis(1)).await;
}
while let Some(f) = subs.pop() {
f.abort();
}
while let Some(f) = pubs.pop() {
f.abort();
}
staged_benchmark_finish_current!(iters as u32);
subs.clear();
pubs.clear();
COUNTER.store(0, Ordering::SeqCst);
for i in 0..workers {
let data = payload.clone();
subs.push(tokio::spawn(async move {
let topic = format!("test{}", i);
let name = format!("sub{}", i);
let mut c = Client::connect(&Config::new(elbus_path, &name))
.await
.unwrap();
c.subscribe(&topic, QoS::No).await.unwrap();
let rx = c.take_event_channel().unwrap();
loop {
let frame = rx.recv().await.unwrap();
assert_eq!(frame.payload(), data);
COUNTER.fetch_add(1, Ordering::SeqCst);
}
}));
}
sleep(Duration::from_secs(1)).await;
staged_benchmark_start!("elbus.pubsub");
for i in 0..workers {
let data = payload.clone();
pubs.push(tokio::spawn(async move {
let topic = format!("test{}", i);
let name = format!("pub{}", i);
let mut c = Client::connect(&Config::new(elbus_path, &name))
.await
.unwrap();
for _ in 0..iters_per_worker {
let _ = c.publish(&topic, data.as_slice().into(), QoS::No).await;
}
}));
}
while COUNTER.load(Ordering::SeqCst) < iters {
sleep(Duration::from_millis(1)).await;
}
while let Some(f) = subs.pop() {
f.abort();
}
while let Some(f) = pubs.pop() {
f.abort();
}
staged_benchmark_finish_current!(iters as u32);
staged_benchmark_print_for!("nats.pubsub");
subs.clear();
pubs.clear();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment