Skip to content

Instantly share code, notes, and snippets.

@jakobrs
Last active January 24, 2023 07:09
Show Gist options
  • Save jakobrs/c2fa57ed0ae428a985a27d97b59554c6 to your computer and use it in GitHub Desktop.
Save jakobrs/c2fa57ed0ae428a985a27d97b59554c6 to your computer and use it in GitHub Desktop.
mod with_executor {
use std::{future::Future, sync::Arc, thread::JoinHandle, time::Duration};
use smol::{future, Executor, Task};
struct Handle {
executor: Arc<Executor<'static>>,
}
struct BackgroundExecutor {
handle: Handle,
signal: smol::channel::Sender<()>,
executor_thread: JoinHandle<()>,
}
impl Handle {
fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) -> Task<()> {
self.executor.spawn(task)
}
}
impl BackgroundExecutor {
fn start() -> Self {
let (signal, shutdown) = smol::channel::unbounded::<()>();
let executor = Arc::new(Executor::new());
let executor_thread = std::thread::spawn({
let ex = executor.clone();
move || future::block_on(ex.run(shutdown.recv())).unwrap()
});
Self {
handle: Handle { executor },
signal,
executor_thread,
}
}
fn handle(&self) -> &Handle {
&self.handle
}
fn spawn(&self, fut: impl Future<Output = ()> + Send + 'static) -> Task<()> {
self.handle.spawn(fut)
}
fn spawn_detach(&self, fut: impl Future<Output = ()> + Send + 'static) {
self.handle.spawn(fut).detach()
}
fn shutdown(self) {
self.signal.send_blocking(()).unwrap();
self.executor_thread.join().unwrap();
}
}
pub fn main() {
let ex = BackgroundExecutor::start();
ex.spawn(async {
println!("Hello world");
})
.detach();
std::thread::sleep(Duration::from_secs(1));
ex.shutdown();
}
}
mod with_local_executor {
use std::{
cell::RefCell,
future::Future,
pin::Pin,
rc::{Rc, Weak},
thread::JoinHandle,
time::Duration,
};
use smol::{channel::Sender, future, LocalExecutor, Task};
enum Request {
Shutdown,
Spawn {
fut: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
response: Option<Sender<Task<()>>>,
},
}
#[derive(Clone)]
struct Handle {
tx: smol::channel::Sender<Request>,
}
struct BackgroundLocalExecutor {
handle: Handle,
executor_thread: JoinHandle<()>,
}
thread_local! {
static EXECUTOR: RefCell<Weak<LocalExecutor<'static>>> = RefCell::new(Weak::new());
}
fn spawn_local(fut: impl Future<Output = ()> + 'static) -> Option<Task<()>> {
EXECUTOR.with(|executor| {
let ex = executor.borrow().upgrade()?;
Some(ex.spawn(fut))
})
}
impl Handle {
async fn spawn(&self, fut: impl Future<Output = ()> + Send + 'static) -> Task<()> {
let (tx, rx) = smol::channel::unbounded();
self.tx
.send(Request::Spawn {
fut: Box::pin(fut),
response: Some(tx),
})
.await
.unwrap();
rx.recv().await.unwrap()
}
fn spawn_blocking(&self, fut: impl Future<Output = ()> + Send + 'static) -> Task<()> {
let (tx, rx) = smol::channel::unbounded();
self.tx
.send_blocking(Request::Spawn {
fut: Box::pin(fut),
response: Some(tx),
})
.unwrap();
rx.recv_blocking().unwrap()
}
async fn spawn_detach(&self, fut: impl Future<Output = ()> + Send + 'static) {
self.tx
.send(Request::Spawn {
fut: Box::pin(fut),
response: None,
})
.await
.unwrap();
}
fn spawn_detach_blocking(&self, fut: impl Future<Output = ()> + Send + 'static) {
self.tx
.send_blocking(Request::Spawn {
fut: Box::pin(fut),
response: None,
})
.unwrap();
}
}
impl BackgroundLocalExecutor {
fn start() -> Self {
let (tx, rx) = smol::channel::unbounded::<Request>();
let executor_thread = std::thread::spawn(move || {
let ex = Rc::new(LocalExecutor::new());
EXECUTOR.with(|executor| {
*executor.borrow_mut() = Rc::downgrade(&ex);
});
future::block_on(ex.run(async {
while let Ok(msg) = rx.recv().await {
match msg {
Request::Shutdown => break,
Request::Spawn { fut, response } => {
let task = ex.spawn(fut);
if let Some(tx) = response {
tx.send(task).await.unwrap();
} else {
task.detach();
}
}
}
}
}))
});
Self {
handle: Handle { tx },
executor_thread,
}
}
fn handle(&self) -> &Handle {
&self.handle
}
async fn spawn(&self, task: impl Future<Output = ()> + Send + 'static) -> Task<()> {
self.handle.spawn(task).await
}
fn spawn_blocking(&self, task: impl Future<Output = ()> + Send + 'static) -> Task<()> {
self.handle.spawn_blocking(task)
}
fn shutdown(self) {
self.handle.tx.send_blocking(Request::Shutdown).unwrap();
self.executor_thread.join().unwrap();
}
}
pub fn main() {
let ex = BackgroundLocalExecutor::start();
ex.spawn_blocking(async {
println!("Hello world");
let a = Rc::new(());
spawn_local(async {
let _b = a; // <- a is !Send
println!("... or not?");
})
.unwrap()
.detach();
})
.detach();
smol::future::block_on(ex.spawn(async {
println!("no");
}))
.detach();
std::thread::sleep(Duration::from_secs(1));
ex.shutdown();
}
}
fn main() {
with_executor::main();
with_local_executor::main();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment