Created
August 11, 2023 15:17
-
-
Save bitristan/f0a7505b0ec8bec411897074799bb8cc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{ | |
future::Future, | |
sync::{ | |
mpsc::{sync_channel, Receiver, SyncSender}, | |
Arc, Mutex, | |
}, | |
task::{Poll, Waker, Context}, | |
thread, | |
time::Duration, | |
}; | |
use futures::{future::BoxFuture, task::{waker_ref, ArcWake}, FutureExt}; | |
struct SharedState { | |
completed: bool, | |
waker: Option<Waker>, | |
} | |
struct TimeFuture { | |
shared_state: Arc<Mutex<SharedState>>, | |
} | |
impl Future for TimeFuture { | |
type Output = (); | |
fn poll( | |
self: std::pin::Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
) -> std::task::Poll<Self::Output> { | |
let mut shared_state = self.shared_state.lock().unwrap(); | |
if shared_state.completed { | |
Poll::Ready(()) | |
} else { | |
shared_state.waker = Some(cx.waker().clone()); | |
Poll::Pending | |
} | |
} | |
} | |
impl TimeFuture { | |
pub fn new(duration: Duration) -> Self { | |
let shared_state = Arc::new(Mutex::new(SharedState { | |
completed: false, | |
waker: None, | |
})); | |
let thread_shared_state = shared_state.clone(); | |
thread::spawn(move || { | |
thread::sleep(duration); | |
let mut shared_state = thread_shared_state.lock().unwrap(); | |
shared_state.completed = true; | |
if let Some(waker) = shared_state.waker.take() { | |
waker.wake() | |
} | |
}); | |
TimeFuture { shared_state } | |
} | |
} | |
struct Executor { | |
ready_queue: Receiver<Arc<Task>>, | |
} | |
#[derive(Clone)] | |
struct Spawner { | |
task_sender: SyncSender<Arc<Task>>, | |
} | |
struct Task { | |
future: Mutex<Option<BoxFuture<'static, ()>>>, | |
/// Handle to place the task itself back onto the task queue. | |
task_sender: SyncSender<Arc<Task>>, | |
} | |
impl Spawner { | |
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { | |
let future = future.boxed(); | |
let task = Arc::new(Task { | |
future: Mutex::new(Some(future)), | |
task_sender: self.task_sender.clone(), | |
}); | |
println!("send task \n"); | |
self.task_sender.send(task).expect("too many tasks queued"); | |
} | |
} | |
impl ArcWake for Task { | |
fn wake_by_ref(arc_self: &Arc<Self>) { | |
// Implement `wake` by sending this task back onto the task channel | |
// so that it will be polled again by the executor. | |
let cloned = arc_self.clone(); | |
println!("wake by ref\n"); | |
arc_self | |
.task_sender | |
.send(cloned) | |
.expect("too many tasks queued"); | |
} | |
} | |
impl Executor { | |
fn run(&self) { | |
while let Ok(task) = self.ready_queue.recv() { | |
// Take the future, and if it has not yet completed (is still Some), | |
// poll it in an attempt to complete it. | |
let mut future_slot = task.future.lock().unwrap(); | |
if let Some(mut future) = future_slot.take() { | |
// Create a `LocalWaker` from the task itself | |
let waker = waker_ref(&task); | |
let context = &mut Context::from_waker(&waker); | |
// `BoxFuture<T>` is a type alias for | |
// `Pin<Box<dyn Future<Output = T> + Send + 'static>>`. | |
// We can get a `Pin<&mut dyn Future + Send + 'static>` | |
// from it by calling the `Pin::as_mut` method. | |
println!("before check!\n"); | |
if future.as_mut().poll(context).is_pending() { | |
// We're not done processing the future, so put it | |
// back in its task to be run again in the future. | |
*future_slot = Some(future); | |
} | |
} | |
} | |
} | |
} | |
fn new_executor_and_spawner() -> (Executor, Spawner) { | |
// Maximum number of tasks to allow queueing in the channel at once. | |
// This is just to make `sync_channel` happy, and wouldn't be present in | |
// a real executor. | |
const MAX_QUEUED_TASKS: usize = 10_000; | |
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); | |
(Executor { ready_queue }, Spawner { task_sender }) | |
} | |
fn main() { | |
let (executor, spawner) = new_executor_and_spawner(); | |
// Spawn a task to print before and after waiting on a timer. | |
spawner.spawn(async { | |
println!("howdy!"); | |
// Wait for our timer future to complete after two seconds. | |
TimeFuture::new(Duration::new(2, 0)).await; | |
println!("done!"); | |
}); | |
// Drop the spawner so that our executor knows it is finished and won't | |
// receive more incoming tasks to run. | |
drop(spawner); | |
// Run the executor until the task queue is empty. | |
// This will print "howdy!", pause, and then print "done!". | |
executor.run(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment