Last active
January 24, 2023 07:09
-
-
Save jakobrs/c2fa57ed0ae428a985a27d97b59554c6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mod with_executor { | |
use std::{future::Future, sync::Arc, thread::JoinHandle, time::Duration}; | |
use smol::{future, Executor, Task}; | |
struct Handle { | |
executor: Arc<Executor<'static>>, | |
} | |
struct BackgroundExecutor { | |
handle: Handle, | |
signal: smol::channel::Sender<()>, | |
executor_thread: JoinHandle<()>, | |
} | |
impl Handle { | |
fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) -> Task<()> { | |
self.executor.spawn(task) | |
} | |
} | |
impl BackgroundExecutor { | |
fn start() -> Self { | |
let (signal, shutdown) = smol::channel::unbounded::<()>(); | |
let executor = Arc::new(Executor::new()); | |
let executor_thread = std::thread::spawn({ | |
let ex = executor.clone(); | |
move || future::block_on(ex.run(shutdown.recv())).unwrap() | |
}); | |
Self { | |
handle: Handle { executor }, | |
signal, | |
executor_thread, | |
} | |
} | |
fn handle(&self) -> &Handle { | |
&self.handle | |
} | |
fn spawn(&self, fut: impl Future<Output = ()> + Send + 'static) -> Task<()> { | |
self.handle.spawn(fut) | |
} | |
fn spawn_detach(&self, fut: impl Future<Output = ()> + Send + 'static) { | |
self.handle.spawn(fut).detach() | |
} | |
fn shutdown(self) { | |
self.signal.send_blocking(()).unwrap(); | |
self.executor_thread.join().unwrap(); | |
} | |
} | |
pub fn main() { | |
let ex = BackgroundExecutor::start(); | |
ex.spawn(async { | |
println!("Hello world"); | |
}) | |
.detach(); | |
std::thread::sleep(Duration::from_secs(1)); | |
ex.shutdown(); | |
} | |
} | |
mod with_local_executor { | |
use std::{ | |
cell::RefCell, | |
future::Future, | |
pin::Pin, | |
rc::{Rc, Weak}, | |
thread::JoinHandle, | |
time::Duration, | |
}; | |
use smol::{channel::Sender, future, LocalExecutor, Task}; | |
enum Request { | |
Shutdown, | |
Spawn { | |
fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>, | |
response: Option<Sender<Task<()>>>, | |
}, | |
} | |
#[derive(Clone)] | |
struct Handle { | |
tx: smol::channel::Sender<Request>, | |
} | |
struct BackgroundLocalExecutor { | |
handle: Handle, | |
executor_thread: JoinHandle<()>, | |
} | |
thread_local! { | |
static EXECUTOR: RefCell<Weak<LocalExecutor<'static>>> = RefCell::new(Weak::new()); | |
} | |
fn spawn_local(fut: impl Future<Output = ()> + 'static) -> Option<Task<()>> { | |
EXECUTOR.with(|executor| { | |
let ex = executor.borrow().upgrade()?; | |
Some(ex.spawn(fut)) | |
}) | |
} | |
impl Handle { | |
async fn spawn(&self, fut: impl Future<Output = ()> + Send + 'static) -> Task<()> { | |
let (tx, rx) = smol::channel::unbounded(); | |
self.tx | |
.send(Request::Spawn { | |
fut: Box::pin(fut), | |
response: Some(tx), | |
}) | |
.await | |
.unwrap(); | |
rx.recv().await.unwrap() | |
} | |
fn spawn_blocking(&self, fut: impl Future<Output = ()> + Send + 'static) -> Task<()> { | |
let (tx, rx) = smol::channel::unbounded(); | |
self.tx | |
.send_blocking(Request::Spawn { | |
fut: Box::pin(fut), | |
response: Some(tx), | |
}) | |
.unwrap(); | |
rx.recv_blocking().unwrap() | |
} | |
async fn spawn_detach(&self, fut: impl Future<Output = ()> + Send + 'static) { | |
self.tx | |
.send(Request::Spawn { | |
fut: Box::pin(fut), | |
response: None, | |
}) | |
.await | |
.unwrap(); | |
} | |
fn spawn_detach_blocking(&self, fut: impl Future<Output = ()> + Send + 'static) { | |
self.tx | |
.send_blocking(Request::Spawn { | |
fut: Box::pin(fut), | |
response: None, | |
}) | |
.unwrap(); | |
} | |
} | |
impl BackgroundLocalExecutor { | |
fn start() -> Self { | |
let (tx, rx) = smol::channel::unbounded::<Request>(); | |
let executor_thread = std::thread::spawn(move || { | |
let ex = Rc::new(LocalExecutor::new()); | |
EXECUTOR.with(|executor| { | |
*executor.borrow_mut() = Rc::downgrade(&ex); | |
}); | |
future::block_on(ex.run(async { | |
while let Ok(msg) = rx.recv().await { | |
match msg { | |
Request::Shutdown => break, | |
Request::Spawn { fut, response } => { | |
let task = ex.spawn(fut); | |
if let Some(tx) = response { | |
tx.send(task).await.unwrap(); | |
} else { | |
task.detach(); | |
} | |
} | |
} | |
} | |
})) | |
}); | |
Self { | |
handle: Handle { tx }, | |
executor_thread, | |
} | |
} | |
fn handle(&self) -> &Handle { | |
&self.handle | |
} | |
async fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) -> Task<()> { | |
self.handle.spawn(task).await | |
} | |
fn spawn_blocking(&self, task: impl Future<Output = ()> + Send + 'static) -> Task<()> { | |
self.handle.spawn_blocking(task) | |
} | |
fn shutdown(self) { | |
self.handle.tx.send_blocking(Request::Shutdown).unwrap(); | |
self.executor_thread.join().unwrap(); | |
} | |
} | |
pub fn main() { | |
let ex = BackgroundLocalExecutor::start(); | |
ex.spawn_blocking(async { | |
println!("Hello world"); | |
let a = Rc::new(()); | |
spawn_local(async { | |
let _b = a; // <- a is !Send | |
println!("... or not?"); | |
}) | |
.unwrap() | |
.detach(); | |
}) | |
.detach(); | |
smol::future::block_on(ex.spawn(async { | |
println!("no"); | |
})) | |
.detach(); | |
std::thread::sleep(Duration::from_secs(1)); | |
ex.shutdown(); | |
} | |
} | |
fn main() { | |
with_executor::main(); | |
with_local_executor::main(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment