Last active
June 19, 2023 20:03
-
-
Save vi/28117c2583ea74d35babfcd6abbef9e6 to your computer and use it in GitHub Desktop.
Rust async executor abuse to avoid both blocking and threads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "hackyws" | |
version = "0.1.0" | |
edition = "2021" | |
[dependencies] | |
anyhow = "1.0.66" | |
async-executor = "1.5.0" | |
async-tungstenite = "0.18.0" | |
futures = "0.3.25" | |
futures-io = "0.3.25" | |
pin-project = "1.0.12" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Imagine this is a third-party app where our code is loaded as a plugin and we want to use async. | |
/// But we should not block (so `Runtime::block_on` is inaccessible) and cannot create any threads. | |
/// We prefer the code use only simple OS features (without epoll or other events) and instead just rely on | |
/// periodic polling by timer (which is itself inside the third-party app). | |
/// `fn main` of this demo simulates such "third-party app", everything else is the plugin code. | |
/// Running the code should connect to Websocket mirror, periodically send messages to it and also print each incoming message to stdout. | |
/// `time strace -f hackyws` shows that polling is indeed moderated by timer (not just busy loop) and that there are no threads getting created. | |
use std::{time::Duration}; | |
use async_tungstenite::tungstenite::Message; | |
use futures::{SinkExt, StreamExt, pin_mut}; | |
pub struct HackyTimerPollDemo { | |
exe: async_executor::LocalExecutor<'static>, | |
} | |
pub mod timer_polled_executor_tools { | |
use std::io::{Read,Write}; | |
use std::task::Poll; | |
use std::pin::Pin; | |
use std; | |
use std::time::{Instant, Duration}; | |
#[pin_project::pin_project] | |
pub struct MySocketWrapper(#[pin] std::net::TcpStream); | |
impl MySocketWrapper { | |
pub fn new(s: std::net::TcpStream) -> std::io::Result<Self> { | |
s.set_nonblocking(true)?; | |
Ok(MySocketWrapper(s)) | |
} | |
} | |
impl futures_io::AsyncRead for MySocketWrapper { | |
fn poll_read( | |
self: Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
buf: &mut [u8], | |
) -> Poll<futures_io::Result<usize>> { | |
match self.project().0.read(buf) { | |
Ok(x) => Poll::Ready(Ok(x)), | |
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { | |
// Immediately register for retrying, relying that timer would drive entire process instead of any actual events. | |
cx.waker().wake_by_ref(); | |
Poll::Pending | |
} | |
Err(e) => Poll::Ready(Err(e)) | |
} | |
} | |
} | |
impl futures_io::AsyncWrite for MySocketWrapper { | |
fn poll_write( | |
self: Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
buf: &[u8], | |
) -> Poll<futures_io::Result<usize>> { | |
match self.project().0.write(buf) { | |
Ok(x) => Poll::Ready(Ok(x)), | |
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { | |
// Immediately register for retrying, relying that timer would drive entire process instead of any actual events. | |
cx.waker().wake_by_ref(); | |
Poll::Pending | |
} | |
Err(e) => Poll::Ready(Err(e)) | |
} | |
} | |
fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<futures_io::Result<()>> { | |
match self.project().0.flush() { | |
Ok(x) => Poll::Ready(Ok(x)), | |
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { | |
// Immediately register for retrying, relying that timer would drive entire process instead of any actual events. | |
cx.waker().wake_by_ref(); | |
Poll::Pending | |
} | |
Err(e) => Poll::Ready(Err(e)) | |
} | |
} | |
fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<futures_io::Result<()>> { | |
self.poll_flush(cx) | |
} | |
} | |
pub struct HackySleep(Instant); | |
impl std::future::Future for HackySleep { | |
type Output = (); | |
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { | |
let now = Instant::now(); | |
if now > self.0 { | |
Poll::Ready(()) | |
} else { | |
// Immediately register for retrying, relying that timer would drive entire process instead of any actual events. | |
cx.waker().wake_by_ref(); | |
Poll::Pending | |
} | |
} | |
} | |
pub fn sleep(t: Duration) -> HackySleep { | |
HackySleep(Instant::now() + t) | |
} | |
} | |
impl HackyTimerPollDemo { | |
async fn start_my_task() -> anyhow::Result<()> { | |
let s = std::net::TcpStream::connect("192.236.209.31:80")?; | |
let s = timer_polled_executor_tools::MySocketWrapper::new(s)?; | |
let (c, _) = async_tungstenite::client_async("ws://ws.vi-server.org/mirror", s).await?; | |
let (mut c_tx, c_rx) = c.split(); | |
let subtask1 = async move { | |
let mut x = 0; | |
loop { | |
let _ = c_tx.send(Message::Text(format!("Hello, {}", x))).await; | |
x += 1; | |
timer_polled_executor_tools::sleep(Duration::from_secs(1)).await; | |
} | |
}; | |
let subtask2 = c_rx.for_each(|x| { | |
match x { | |
Ok(x) => match x { | |
Message::Text(s) => println!("{}", s), | |
_ => (), | |
} | |
Err(_) => (), | |
} | |
futures::future::ready(()) | |
}); | |
pin_mut!(subtask1); | |
pin_mut!(subtask2); | |
futures::future::select(subtask1, subtask2).await; | |
Ok(()) | |
} | |
pub fn new() -> anyhow::Result<HackyTimerPollDemo> { | |
// maybe `futures_executor::LocalPool` is better for this? | |
let exe = async_executor::LocalExecutor::new(); | |
exe.spawn(HackyTimerPollDemo::start_my_task()).detach(); | |
Ok(HackyTimerPollDemo { | |
exe, | |
}) | |
} | |
pub fn step(&mut self) -> anyhow::Result<()> { | |
self.exe.try_tick(); | |
Ok(()) | |
} | |
} | |
fn main() -> anyhow::Result<()> { | |
let mut demo = HackyTimerPollDemo::new()?; | |
loop { | |
demo.step()?; | |
std::thread::sleep(Duration::from_millis(50)); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Another version: https://gist.github.com/vi/39607d1963b069a5167099f3fbffebf4