Skip to content

Instantly share code, notes, and snippets.

@zerosign
Last active January 17, 2020 12:07
Show Gist options
  • Select an option

  • Save zerosign/531e4e2bcf38cb394866946b75c8db99 to your computer and use it in GitHub Desktop.

Select an option

Save zerosign/531e4e2bcf38cb394866946b75c8db99 to your computer and use it in GitHub Desktop.
use hello_world::{sample_client::SampleClient, SeqNum, User};
use futures::{
future::{join_all, FutureExt},
stream::{FuturesUnordered, StreamExt},
};
use tokio::{
prelude::*,
runtime::{Builder, Handle, Runtime},
sync::Mutex,
task,
time::{timeout, Duration},
};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
pub mod hello_world {
tonic::include_proto!("hello_world");
}
static COUNTER: AtomicUsize = AtomicUsize::new(0);
static SUCCESS: AtomicUsize = AtomicUsize::new(0);
static ERROR: AtomicUsize = AtomicUsize::new(0);
fn main() {
let mut timer = Builder::new()
.enable_time()
.basic_scheduler()
.thread_name("timer-thread")
.build()
.expect("can't create timer runtime");
let mut runtime = Builder::new()
.threaded_scheduler()
.enable_io()
.enable_time()
.core_threads(8)
.thread_name("grpc-threads")
.build()
.expect("can't create grpc runtime");
println!("grpc-runtime: {:?}", runtime);
println!("timer-runtime: {:?}", timer);
let client = runtime
.block_on(SampleClient::connect("http://[::1]:8080"))
.map(move |c| Arc::new(Mutex::new(c)))
.expect("can't pre-allocate grpc client");
let task = timer.block_on(timeout(Duration::from_secs(10), async move {
let handle = runtime.handle();
loop {
let client = client.clone();
let _ = handle.spawn(async move {
let mut client = client.lock().await;
let request = tonic::Request::new(SeqNum {
counter: COUNTER.load(Ordering::SeqCst) as u64,
max_counter: 10,
});
match client.check(request).await {
Ok(r) => {
let sequence = r.into_inner();
if sequence.counter < sequence.max_counter {
COUNTER.fetch_add(1, Ordering::SeqCst);
} else {
// reset
COUNTER.compare_and_swap(
(sequence.counter as usize - 1 as usize),
0,
Ordering::SeqCst,
);
}
SUCCESS.fetch_add(1, Ordering::SeqCst);
}
_ => {
ERROR.fetch_add(1, Ordering::SeqCst);
}
}
});
}
}));
println!(
"task: {:?}, counter: {}, error: {}, success: {}",
task,
COUNTER.load(Ordering::SeqCst),
ERROR.load(Ordering::SeqCst),
SUCCESS.load(Ordering::SeqCst),
);
}
rpc-runtime: Runtime { kind: ThreadPool(ThreadPool), handle: Handle { spawner: ThreadPool(Spawner), io_handle: Some(Handle), time_handle: Some(Handle), clock: Clock, blocking_spawner: blocking::Spawner }, blocking_pool: BlockingPool }
timer-runtime: Runtime { kind: Basic(BasicScheduler { scheduler: Scheduler { queues: MpscQueues { owned_tasks: UnsafeCell, remote_queue: Mutex { data: RemoteQueue { queue: [], open: true } }, local_queue: UnsafeCell } }, local: LocalState { tick: 0, park: Driver { inner: Inner, wheel: Wheel { elapsed: 0, levels: [Level { occupied: 0 }, Level { occupied: 0 }, Level { occupied: 0 }, Level { occupied: 0 }, Level { occupied: 0 }, Level { occupied: 0 }] }, park: ParkThread { inner: Inner { state: 0, mutex: Mutex { data: () }, condvar: Condvar { .. } } }, clock: Clock } } }), handle: Handle { spawner: Basic(Spawner { scheduler: Scheduler { queues: MpscQueues { owned_tasks: UnsafeCell, remote_queue: Mutex { data: RemoteQueue { queue: [], open: true } }, local_queue: UnsafeCell } } }), io_handle: None, time_handle: Some(Handle), clock: Clock, blocking_spawner: blocking::Spawner }, blocking_pool: BlockingPool }
thread 'main' panicked at 'no current timer', src/libcore/option.rs:1188:5
stack backtrace:
0: 0x76a404 - backtrace::backtrace::libunwind::trace::h208b77d6980cdba3
at /cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.40/src/backtrace/libunwind.rs:88
1: 0x76a404 - backtrace::backtrace::trace_unsynchronized::h279c5063b29b89e0
at /cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.40/src/backtrace/mod.rs:66
2: 0x76a404 - std::sys_common::backtrace::_print_fmt::h0dd5ce3b2c0c1196
at src/libstd/sys_common/backtrace.rs:77
3: 0x76a404 - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::h64e0000db2a39f17
at src/libstd/sys_common/backtrace.rs:59
4: 0x7932fc - core::fmt::write::h97bceedab1a3faef
at src/libcore/fmt/mod.rs:1057
5: 0x7659b7 - std::io::Write::write_fmt::hdf931701cccb1c38
at src/libstd/io/mod.rs:1426
6: 0x76c51e - std::sys_common::backtrace::_print::h206f5e67efbe6095
at src/libstd/sys_common/backtrace.rs:62
7: 0x76c51e - std::sys_common::backtrace::print::h2e216a34252f7d84
at src/libstd/sys_common/backtrace.rs:49
8: 0x76c51e - std::panicking::default_hook::{{closure}}::hdf8b9b1ee9f4dcfe
at src/libstd/panicking.rs:195
9: 0x76c211 - std::panicking::default_hook::h98bc5701a731ade9
at src/libstd/panicking.rs:215
10: 0x76cb63 - std::panicking::rust_panic_with_hook::h7277f8b6e4c099fd
at src/libstd/panicking.rs:463
11: 0x76c750 - rust_begin_unwind
at src/libstd/panicking.rs:371
12: 0x7925be - core::panicking::panic_fmt::hb0b8ce9be9696e8c
at src/libcore/panicking.rs:85
13: 0x7923e7 - core::option::expect_failed::ha7a3f28c7d603476
at src/libcore/option.rs:1188
14: 0x70d56a - core::option::Option<T>::expect::h4aa5ffca5feefc9b
at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libcore/option.rs:348
15: 0x6cae48 - tokio::time::driver::handle::Handle::current::h03bc15e42dd067e5
at /home/zerosign/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.9/src/time/driver/handle.rs:24
16: 0x6caf0c - tokio::time::driver::registration::Registration::new::h68ccc6b23e4202d7
at /home/zerosign/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.9/src/time/driver/registration.rs:18
17: 0x70002f - tokio::time::delay::Delay::new_timeout::h9d4fa95c287d304d
at /home/zerosign/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.9/src/time/delay.rs:53
18: 0x4a667a - tokio::time::timeout::timeout::h20eecc3f01ff06ed
at /home/zerosign/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.9/src/time/timeout.rs:51
19: 0x42d3ec - client::main::h65fc9f05fe2ee2bf
at hello-grpc-service/src/client.rs:53
20: 0x477920 - std::rt::lang_start::{{closure}}::h225f7edcf2658f66
at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/rt.rs:67
21: 0x76c643 - std::rt::lang_start_internal::{{closure}}::h0dce4b31106beb66
at src/libstd/rt.rs:52
22: 0x76c643 - std::panicking::try::do_call::ha51d211429b63650
at src/libstd/panicking.rs:296
23: 0x76f4ea - __rust_maybe_catch_panic
at src/libpanic_unwind/lib.rs:79
24: 0x76cedb - std::panicking::try::h8ceb6fd656ca2b0f
at src/libstd/panicking.rs:272
25: 0x76cedb - std::panic::catch_unwind::hdfc01d8c92033d66
at src/libstd/panic.rs:394
26: 0x76cedb - std::rt::lang_start_internal::h1d7cfac94d9ab673
at src/libstd/rt.rs:51
27: 0x4778f9 - std::rt::lang_start::hb7464ad8df608123
at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/rt.rs:67
28: 0x42d79a - main
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment