Skip to content

Instantly share code, notes, and snippets.

@folex
Created April 23, 2020 15:03
Show Gist options
  • Save folex/18576fb63ca310d79e51effddff9a1b3 to your computer and use it in GitHub Desktop.
Save folex/18576fb63ca310d79e51effddff9a1b3 to your computer and use it in GitHub Desktop.
ctrlc_hang.rs
#[cfg(test)]
mod tests {
use async_std::task;
use futures::{
channel::{mpsc, mpsc::UnboundedReceiver},
prelude::*,
select,
stream::StreamExt,
};
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Serialize, Deserialize, Debug)]
struct ClientCommand {
cmd: String,
}
fn block_until_ctrlc() {
let (ctrlc_outlet, ctrlc_inlet) = futures::channel::oneshot::channel();
let ctrlc_outlet = std::cell::RefCell::new(Some(ctrlc_outlet));
ctrlc::set_handler(move || {
if let Some(outlet) = ctrlc_outlet.borrow_mut().take() {
outlet.send(()).expect("sending shutdown signal failed");
}
})
.expect("Error while setting ctrlc handler");
async_std::task::block_on(ctrlc_inlet).expect("exit oneshot failed");
}
fn read_cmds_from_stdin() -> UnboundedReceiver<serde_json::error::Result<ClientCommand>> {
let (cmd_sender, cmd_recv) = mpsc::unbounded();
task::spawn(async move {
use serde_json::Deserializer;
use std::io; // NOTE: this is synchronous IO
loop {
let stdin = io::BufReader::new(io::stdin());
let stream = Deserializer::from_reader(stdin).into_iter::<ClientCommand>();
// vvvvvvvvvvvvvvvv we're blocking here
for cmd in stream {
cmd_sender.unbounded_send(cmd).expect("send cmd");
// task::sleep(Duration::from_nanos(10)).await; // return Poll::Pending from future's fn poll
}
}
});
cmd_recv
}
fn run_timer() -> UnboundedReceiver<serde_json::error::Result<()>> {
let (sender, recv) = mpsc::unbounded();
task::spawn(async move {
loop {
task::sleep(Duration::from_secs(1)).await;
sender.unbounded_send(Ok(())).expect("send unit");
}
});
recv
}
#[test]
fn test() {
let mut cmds = read_cmds_from_stdin().into_stream().fuse();
let mut units = run_timer().into_stream().fuse();
let future = task::spawn(async move {
loop {
select!(
cmd = cmds.select_next_some() => {
println!("got cmd {:?}", cmd);
},
unit = units.select_next_some() => {
println!("got tick");
}
)
}
});
block_until_ctrlc();
task::block_on(future);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment