Last active
January 18, 2022 01:17
-
-
Save divi255/683ae8e9c0c63e34f49e058aeb536349 to your computer and use it in GitHub Desktop.
nats-elbus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* [dependencies] | |
tokio = { version = "1.15.0", features = ["full"] } | |
bma-benchmark = "0.0.18" | |
async-nats = "0.10.1" | |
elbus = { version = "", features = ["full"] } | |
*/ | |
#[macro_use] | |
extern crate bma_benchmark; | |
use elbus::client::AsyncClient; | |
use elbus::ipc::{Client, Config}; | |
use std::sync::atomic::{AtomicUsize, Ordering}; | |
use std::time::Duration; | |
use tokio::time::sleep; | |
use elbus::QoS; | |
static COUNTER: AtomicUsize = AtomicUsize::new(0); | |
#[tokio::main] | |
async fn main() { | |
let nats_path = "localhost:4222"; | |
let elbus_path = "localhost:9924"; | |
//let elbus_path = "/tmp/elbus.sock"; | |
let iters = 1_000_000; | |
let workers = 2; | |
let iters_per_worker = iters / workers; | |
let payload = vec![0xee; 100]; | |
let mut subs = Vec::new(); | |
let mut pubs = Vec::new(); | |
COUNTER.store(0, Ordering::SeqCst); | |
for i in 0..workers { | |
let data = payload.clone(); | |
subs.push(tokio::spawn(async move { | |
let topic = format!("test{}", i); | |
let nc = async_nats::connect(nats_path).await.unwrap(); | |
let sub = nc.subscribe(&topic).await.unwrap(); | |
for _ in 0..iters_per_worker { | |
let msg = sub.next().await.unwrap(); | |
assert_eq!(msg.data, data); | |
COUNTER.fetch_add(1, Ordering::SeqCst); | |
} | |
})); | |
} | |
sleep(Duration::from_secs(1)).await; | |
staged_benchmark_start!("nats.pubsub"); | |
for i in 0..workers { | |
let data = payload.clone(); | |
pubs.push(tokio::spawn(async move { | |
let topic = format!("test{}", i); | |
let nc = async_nats::connect(nats_path).await.unwrap(); | |
loop { | |
let _ = nc.publish(&topic, &data).await; | |
} | |
})); | |
} | |
while COUNTER.load(Ordering::SeqCst) < iters { | |
sleep(Duration::from_millis(1)).await; | |
} | |
while let Some(f) = subs.pop() { | |
f.abort(); | |
} | |
while let Some(f) = pubs.pop() { | |
f.abort(); | |
} | |
staged_benchmark_finish_current!(iters as u32); | |
subs.clear(); | |
pubs.clear(); | |
COUNTER.store(0, Ordering::SeqCst); | |
for i in 0..workers { | |
let data = payload.clone(); | |
subs.push(tokio::spawn(async move { | |
let topic = format!("test{}", i); | |
let name = format!("sub{}", i); | |
let mut c = Client::connect(&Config::new(elbus_path, &name)) | |
.await | |
.unwrap(); | |
c.subscribe(&topic, QoS::No).await.unwrap(); | |
let rx = c.take_event_channel().unwrap(); | |
loop { | |
let frame = rx.recv().await.unwrap(); | |
assert_eq!(frame.payload(), data); | |
COUNTER.fetch_add(1, Ordering::SeqCst); | |
} | |
})); | |
} | |
sleep(Duration::from_secs(1)).await; | |
staged_benchmark_start!("elbus.pubsub"); | |
for i in 0..workers { | |
let data = payload.clone(); | |
pubs.push(tokio::spawn(async move { | |
let topic = format!("test{}", i); | |
let name = format!("pub{}", i); | |
let mut c = Client::connect(&Config::new(elbus_path, &name)) | |
.await | |
.unwrap(); | |
for _ in 0..iters_per_worker { | |
let _ = c.publish(&topic, data.as_slice().into(), QoS::No).await; | |
} | |
})); | |
} | |
while COUNTER.load(Ordering::SeqCst) < iters { | |
sleep(Duration::from_millis(1)).await; | |
} | |
while let Some(f) = subs.pop() { | |
f.abort(); | |
} | |
while let Some(f) = pubs.pop() { | |
f.abort(); | |
} | |
staged_benchmark_finish_current!(iters as u32); | |
staged_benchmark_print_for!("nats.pubsub"); | |
subs.clear(); | |
pubs.clear(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment