Skip to content

Instantly share code, notes, and snippets.

@ssrlive
Last active February 28, 2025 03:54
Show Gist options
  • Save ssrlive/e88a6f4c8cc4d2434d2868dc4b329795 to your computer and use it in GitHub Desktop.
Save ssrlive/e88a6f4c8cc4d2434d2868dc4b329795 to your computer and use it in GitHub Desktop.
async learning
//
// This is a simple implementation of a futures executor and spawner.
// ref: https://www.bilibili.com/video/BV1Ki4y1C7gj/?vd_source=1658ea36cae9cc99e7417f13c4b86b2f&p=4&spm_id_from=333.788.videopod.episodes
//
// [dependencies]
// futures = "0.3"
//
use futures::{
future::{BoxFuture, FutureExt},
task::{ArcWake, waker_ref},
};
use std::{
future::Future,
pin::Pin,
sync::{
Arc, Mutex,
mpsc::{Receiver, SyncSender, sync_channel},
},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
pub fn new_executor_and_spawner<T>() -> (Executor<T>, Spawner<T>) {
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
#[derive(Clone)]
pub struct Spawner<T> {
task_sender: SyncSender<Arc<Task<T>>>,
}
impl<T> Spawner<T> {
pub fn spawn(&self, future: impl Future<Output = T> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.try_send(task).unwrap();
}
}
struct Task<T> {
future: Mutex<Option<BoxFuture<'static, T>>>,
task_sender: SyncSender<Arc<Task<T>>>,
}
impl<T: Default> Task<T> {
pub fn poll_task(self: &Arc<Self>, context: &mut Context<'_>) -> Poll<T> {
let mut future_slot = self.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
let poll_result = future.as_mut().poll(context);
if poll_result.is_pending() {
*future_slot = Some(future);
}
poll_result
} else {
Poll::Ready(T::default())
}
}
}
impl<T> ArcWake for Task<T> {
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
arc_self.task_sender.try_send(cloned).unwrap();
}
}
pub struct Executor<T> {
ready_queue: Receiver<Arc<Task<T>>>,
}
impl<T: Default + 'static> Executor<T> {
pub fn run(&self) -> T {
let mut res = T::default();
while let Ok(task) = self.ready_queue.recv() {
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&waker);
if let Poll::Ready(v) = task.poll_task(context) {
res = v;
}
}
res
}
}
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
#[derive(Default)]
struct SharedState {
completed: bool,
waker: Option<Waker>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl TimerFuture {
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState::default()));
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
#[must_use]
pub fn block_on<T: Default + 'static>(future: impl Future<Output = T> + 'static + Send) -> T {
let (executor, spawner) = new_executor_and_spawner::<T>();
spawner.spawn(future);
drop(spawner);
executor.run()
}
#[allow(dead_code)]
fn main() {
let _v = block_on(async {
println!("howdy!");
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
}
#[test]
fn run_main() {
main()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment