Created
December 18, 2022 20:14
-
-
Save vi/39607d1963b069a5167099f3fbffebf4 to your computer and use it in GitHub Desktop.
Rust async executor abuse to avoid both blocking and threads (v2)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "hackyws" | |
version = "0.1.1" | |
edition = "2021" | |
[dependencies] | |
anyhow = "1.0.66" | |
async-tungstenite = "0.18.0" | |
flume = "0.10.14" | |
futures = "0.3.25" | |
futures-io = "0.3.25" | |
pin-project = "1.0.12" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Imagine this is a third-party app where our code is loaded as a plugin and we want to use async. | |
/// But we should not block (so `Runtime::block_on` is inaccessible) and cannot create any threads. | |
/// We prefer the code use only simple OS features (without epoll or other events) and instead just rely on | |
/// periodic polling by timer (which is itself inside the third-party app). | |
/// `fn main` of this demo simulates such "third-party app", everything else is the plugin code. | |
/// Running the code should connect to Websocket mirror, periodically send messages to it and also print each incoming message to stdout. | |
/// `time strace -f hackyws` shows that polling is indeed moderated by timer (not just busy loop) | |
/// and that there are no threads getting created (apart from "sync" threads we do create explicitly). | |
/// | |
/// This is the second, more complicated version of the demo. See the first one at https://gist.github.com/vi/28117c2583ea74d35babfcd6abbef9e6 | |
use std::{pin::Pin, time::Duration}; | |
use async_tungstenite::tungstenite::Message; | |
use futures::{ | |
task::{LocalSpawnExt, SpawnExt}, | |
Future, SinkExt, StreamExt, | |
}; | |
pub struct HackyTimerPollDemo { | |
exe: futures::executor::LocalPool, | |
wakers: timer_polled_executor_tools::TimerBasedWakers, | |
} | |
pub mod timer_polled_executor_tools { | |
use std::io::{Read, Write}; | |
use std::sync::{Arc, Mutex}; | |
use std::task::Poll; | |
use std::pin::Pin; | |
use std; | |
use std::time::{Duration, Instant}; | |
#[pin_project::pin_project] | |
pub struct MySocketWrapper(#[pin] std::net::TcpStream, TimerBasedWakers); | |
#[derive(Clone)] | |
pub struct TimerBasedWakers { | |
wakers: Arc<Mutex<Vec<std::task::Waker>>>, | |
} | |
impl TimerBasedWakers { | |
fn add<'a>(&self, ctx: &mut std::task::Context<'a>) { | |
self.wakers.lock().unwrap().push(ctx.waker().clone()); | |
} | |
/// Wake up all accumulated wakers. | |
/// This method is supposed to be called periodically, prior to stepping some ticks of the executor | |
pub fn wake_all(&self) { | |
self.wakers.lock().unwrap().drain(..).for_each(|x| x.wake()); | |
} | |
pub fn new() -> TimerBasedWakers { | |
TimerBasedWakers { | |
wakers: Arc::new(Mutex::new(Vec::with_capacity(2))), | |
} | |
} | |
} | |
impl MySocketWrapper { | |
pub fn new(s: std::net::TcpStream, wakers: TimerBasedWakers) -> std::io::Result<Self> { | |
s.set_nonblocking(true)?; | |
Ok(MySocketWrapper(s, wakers)) | |
} | |
} | |
impl futures_io::AsyncRead for MySocketWrapper { | |
fn poll_read( | |
self: Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
buf: &mut [u8], | |
) -> Poll<futures_io::Result<usize>> { | |
let mut this = self.project(); | |
match this.0.read(buf) { | |
Ok(x) => Poll::Ready(Ok(x)), | |
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { | |
this.1.add(cx); | |
Poll::Pending | |
} | |
Err(e) => Poll::Ready(Err(e)), | |
} | |
} | |
} | |
impl futures_io::AsyncWrite for MySocketWrapper { | |
fn poll_write( | |
self: Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
buf: &[u8], | |
) -> Poll<futures_io::Result<usize>> { | |
let mut this = self.project(); | |
match this.0.write(buf) { | |
Ok(x) => Poll::Ready(Ok(x)), | |
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { | |
this.1.add(cx); | |
Poll::Pending | |
} | |
Err(e) => Poll::Ready(Err(e)), | |
} | |
} | |
fn poll_flush( | |
self: Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
) -> Poll<futures_io::Result<()>> { | |
let mut this = self.project(); | |
match this.0.flush() { | |
Ok(x) => Poll::Ready(Ok(x)), | |
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { | |
this.1.add(cx); | |
Poll::Pending | |
} | |
Err(e) => Poll::Ready(Err(e)), | |
} | |
} | |
fn poll_close( | |
self: Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
) -> Poll<futures_io::Result<()>> { | |
self.poll_flush(cx) | |
} | |
} | |
pub struct HackySleep(Instant, TimerBasedWakers); | |
impl std::future::Future for HackySleep { | |
type Output = (); | |
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { | |
let now = Instant::now(); | |
if now > self.0 { | |
Poll::Ready(()) | |
} else { | |
self.1.add(cx); | |
Poll::Pending | |
} | |
} | |
} | |
pub fn sleep(t: Duration, wakers: TimerBasedWakers) -> HackySleep { | |
HackySleep(Instant::now() + t, wakers) | |
} | |
} | |
impl HackyTimerPollDemo { | |
async fn start_my_task( | |
tx: flume::Sender<String>, | |
rx: flume::Receiver<String>, | |
mut spawner: impl FnMut(Pin<Box<dyn Future<Output = ()> + Send>>), | |
wakers: timer_polled_executor_tools::TimerBasedWakers, | |
) -> anyhow::Result<()> { | |
let s = std::net::TcpStream::connect("192.236.209.31:80")?; | |
let s = timer_polled_executor_tools::MySocketWrapper::new(s, wakers)?; | |
let (c, _) = async_tungstenite::client_async("ws://ws.vi-server.org/mirror", s).await?; | |
let (mut c_tx, mut c_rx) = c.split(); | |
let subtask1 = async move { | |
while let Ok(msg) = rx.recv_async().await { | |
let _ = c_tx.send(Message::Text(msg)).await; | |
} | |
}; | |
let subtask2 = async move { | |
while let Some(Ok(msg)) = c_rx.next().await { | |
match msg { | |
Message::Text(s) => { | |
let _ = tx.send_async(s).await; | |
} | |
_ => (), | |
} | |
} | |
}; | |
spawner(Box::pin(subtask1)); | |
spawner(Box::pin(subtask2)); | |
Ok(()) | |
} | |
pub fn new() -> anyhow::Result<( | |
HackyTimerPollDemo, | |
flume::Sender<String>, | |
flume::Receiver<String>, | |
)> { | |
let exe = futures::executor::LocalPool::new(); | |
let wakers = timer_polled_executor_tools::TimerBasedWakers::new(); | |
let (tx1, rx1) = flume::bounded(1); | |
let (tx2, rx2) = flume::bounded(1); | |
let spawner1 = exe.spawner(); | |
let spawner2 = exe.spawner(); | |
let spawner = move |boxed_future: Pin<Box<dyn Future<Output = ()> + Send>>| { | |
let _ = spawner1.spawn(boxed_future); | |
}; | |
let wakers2 = wakers.clone(); | |
let _ = spawner2.spawn_local(async move { | |
if let Err(e) = HackyTimerPollDemo::start_my_task(tx2, rx1, spawner, wakers2).await { | |
eprintln!("Error: {}", e); | |
} | |
}); | |
Ok((HackyTimerPollDemo { exe, wakers }, tx1, rx2)) | |
} | |
pub fn step(&mut self) -> anyhow::Result<()> { | |
self.wakers.wake_all(); | |
self.exe.run_until_stalled(); | |
Ok(()) | |
} | |
} | |
/// Example of a function that passes data from sync world to async world. | |
fn sync_to_async(tx: flume::Sender<String>) { | |
let _ = tx.send("Hello".to_owned()); | |
let mut i = 0; | |
loop { | |
std::thread::sleep(Duration::from_secs(1)); | |
let _ = tx.send(format!("Msg {}", i)); | |
i += 1; | |
} | |
} | |
/// Example of a sync function that receives data from async world | |
fn async_to_sync(rx: flume::Receiver<String>) { | |
for x in rx { | |
println!("{}", x); | |
} | |
} | |
fn main() -> anyhow::Result<()> { | |
let (mut demo, tx, rx) = HackyTimerPollDemo::new()?; | |
std::thread::spawn(move || sync_to_async(tx)); | |
std::thread::spawn(move || async_to_sync(rx)); | |
loop { | |
demo.step()?; | |
std::thread::sleep(Duration::from_millis(100)); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment