Last active
February 28, 2025 03:54
-
-
Save ssrlive/e88a6f4c8cc4d2434d2868dc4b329795 to your computer and use it in GitHub Desktop.
async learning
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// This is a simple implementation of a futures executor and spawner. | |
// ref: https://www.bilibili.com/video/BV1Ki4y1C7gj/?vd_source=1658ea36cae9cc99e7417f13c4b86b2f&p=4&spm_id_from=333.788.videopod.episodes | |
// | |
// [dependencies] | |
// futures = "0.3" | |
// | |
use futures::{ | |
future::{BoxFuture, FutureExt}, | |
task::{ArcWake, waker_ref}, | |
}; | |
use std::{ | |
future::Future, | |
pin::Pin, | |
sync::{ | |
Arc, Mutex, | |
mpsc::{Receiver, SyncSender, sync_channel}, | |
}, | |
task::{Context, Poll, Waker}, | |
thread, | |
time::Duration, | |
}; | |
pub fn new_executor_and_spawner<T>() -> (Executor<T>, Spawner<T>) { | |
const MAX_QUEUED_TASKS: usize = 10_000; | |
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); | |
(Executor { ready_queue }, Spawner { task_sender }) | |
} | |
#[derive(Clone)] | |
pub struct Spawner<T> { | |
task_sender: SyncSender<Arc<Task<T>>>, | |
} | |
impl<T> Spawner<T> { | |
pub fn spawn(&self, future: impl Future<Output = T> + 'static + Send) { | |
let future = future.boxed(); | |
let task = Arc::new(Task { | |
future: Mutex::new(Some(future)), | |
task_sender: self.task_sender.clone(), | |
}); | |
self.task_sender.try_send(task).unwrap(); | |
} | |
} | |
struct Task<T> { | |
future: Mutex<Option<BoxFuture<'static, T>>>, | |
task_sender: SyncSender<Arc<Task<T>>>, | |
} | |
impl<T: Default> Task<T> { | |
pub fn poll_task(self: &Arc<Self>, context: &mut Context<'_>) -> Poll<T> { | |
let mut future_slot = self.future.lock().unwrap(); | |
if let Some(mut future) = future_slot.take() { | |
let poll_result = future.as_mut().poll(context); | |
if poll_result.is_pending() { | |
*future_slot = Some(future); | |
} | |
poll_result | |
} else { | |
Poll::Ready(T::default()) | |
} | |
} | |
} | |
impl<T> ArcWake for Task<T> { | |
fn wake_by_ref(arc_self: &Arc<Self>) { | |
let cloned = arc_self.clone(); | |
arc_self.task_sender.try_send(cloned).unwrap(); | |
} | |
} | |
pub struct Executor<T> { | |
ready_queue: Receiver<Arc<Task<T>>>, | |
} | |
impl<T: Default + 'static> Executor<T> { | |
pub fn run(&self) -> T { | |
let mut res = T::default(); | |
while let Ok(task) = self.ready_queue.recv() { | |
let waker = waker_ref(&task); | |
let context = &mut Context::from_waker(&waker); | |
if let Poll::Ready(v) = task.poll_task(context) { | |
res = v; | |
} | |
} | |
res | |
} | |
} | |
pub struct TimerFuture { | |
shared_state: Arc<Mutex<SharedState>>, | |
} | |
#[derive(Default)] | |
struct SharedState { | |
completed: bool, | |
waker: Option<Waker>, | |
} | |
impl Future for TimerFuture { | |
type Output = (); | |
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
let mut shared_state = self.shared_state.lock().unwrap(); | |
if shared_state.completed { | |
Poll::Ready(()) | |
} else { | |
shared_state.waker = Some(cx.waker().clone()); | |
Poll::Pending | |
} | |
} | |
} | |
impl TimerFuture { | |
pub fn new(duration: Duration) -> Self { | |
let shared_state = Arc::new(Mutex::new(SharedState::default())); | |
let thread_shared_state = shared_state.clone(); | |
thread::spawn(move || { | |
thread::sleep(duration); | |
let mut shared_state = thread_shared_state.lock().unwrap(); | |
shared_state.completed = true; | |
if let Some(waker) = shared_state.waker.take() { | |
waker.wake() | |
} | |
}); | |
TimerFuture { shared_state } | |
} | |
} | |
#[must_use] | |
pub fn block_on<T: Default + 'static>(future: impl Future<Output = T> + 'static + Send) -> T { | |
let (executor, spawner) = new_executor_and_spawner::<T>(); | |
spawner.spawn(future); | |
drop(spawner); | |
executor.run() | |
} | |
#[allow(dead_code)] | |
fn main() { | |
let _v = block_on(async { | |
println!("howdy!"); | |
TimerFuture::new(Duration::new(2, 0)).await; | |
println!("done!"); | |
}); | |
} | |
#[test] | |
fn run_main() { | |
main() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment