Created
March 25, 2024 02:40
-
-
Save a10y/7d40f3511a7ea37b10c31b7b2200a089 to your computer and use it in GitHub Desktop.
Simple current-thread async executor in Rust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(noop_waker)] | |
#![feature(local_waker)] | |
use std::collections::{HashMap, VecDeque}; | |
use std::fmt; | |
use std::fmt::Formatter; | |
use std::future::Future; | |
use std::marker::PhantomData; | |
use std::pin::Pin; | |
use std::ptr::null_mut; | |
use std::sync::{Arc, Mutex}; | |
use std::sync::atomic::{AtomicU64, Ordering}; | |
use std::task::{ContextBuilder, Poll, Waker}; | |
fn main() { | |
// Create our own executor that can execute futures. | |
let executor = SimpleExecutor { | |
finished: Arc::new(Mutex::new(HashMap::new())), | |
runnable: Arc::new(Mutex::new(VecDeque::new())), | |
}; | |
println!("SUBMIT"); | |
let handle = executor.spawn(async move { | |
// What to do in here | |
println!("this is my future"); | |
"success".to_string() | |
}); | |
println!("{executor:?}"); | |
println!("NEXT_TASK"); | |
executor.next_task(); | |
println!("{executor:?}"); | |
println!("SHUTDOWN"); | |
} | |
#[derive(Debug, Clone, Copy, Default, Hash, PartialEq, Eq)] | |
struct TaskId(u64); | |
struct Slot<T, F: Future<Output=T>> { | |
task_id: TaskId, | |
future: Pin<Box<F>>, | |
data: *mut T, | |
} | |
// How can we keep track of a simple executor that knows how to store the futures. | |
// But what if we don't know the name of the future ahead of time? | |
// There are some types we can't specifically name, is that part of the typing? | |
struct SimpleExecutor<T, F: Future<Output=T>> { | |
runnable: Arc<Mutex<VecDeque<Slot<T, F>>>>, | |
finished: Arc<Mutex<HashMap<TaskId, Slot<T, F>>>>, | |
} | |
impl<T, F: Future<Output=T>> fmt::Debug for SimpleExecutor<T, F> { | |
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { | |
f.debug_struct("SimpleExecutor") | |
.field("runnable_count", &self.runnable.lock().unwrap().len()) | |
.field("finished_count", &self.finished.lock().unwrap().len()) | |
.finish() | |
} | |
} | |
static TASK_IDS: AtomicU64 = AtomicU64::new(1); | |
impl<T, F: Future<Output=T>> SimpleExecutor<T, F> { | |
pub fn new() -> Self { | |
Self { | |
runnable: Arc::new(Mutex::new(VecDeque::new())), | |
finished: Arc::new(Mutex::new(HashMap::new())), | |
} | |
} | |
pub fn next_task(&self) { | |
// loop { | |
println!("task loop"); | |
let mut runnable = self.runnable.lock().unwrap(); | |
if let Some(mut task) = runnable.pop_front() { | |
println!("next task found"); | |
let fut = task.future.as_mut(); | |
// let ctx: Context::from_waker(&mut Waker::noop()); | |
let mut cx = ContextBuilder::from_waker(&Waker::noop()).build(); | |
match fut.poll(&mut cx) { | |
Poll::Ready(result) => { | |
unsafe { | |
if !task.data.is_null() { | |
panic!("something went terribly wrong: future is completing twice"); | |
} | |
// Box the result and save into a pointer | |
let mut result = Box::new(result); | |
task.data = result.as_mut(); | |
} | |
self.finished.lock().unwrap().insert( | |
task.task_id, | |
task, | |
); | |
} | |
Poll::Pending => { | |
return; | |
// continue | |
} | |
} | |
} | |
// } | |
} | |
pub fn spawn(&self, fut: F) -> Handle<T> | |
{ | |
let fut = Box::pin(fut); | |
let task_id = TaskId(TASK_IDS.fetch_add(1, Ordering::AcqRel)); | |
let slot = Slot { | |
task_id, | |
data: null_mut(), | |
future: fut, | |
}; | |
// Store the slot inside ourselves | |
let mut locked = self.runnable.lock().unwrap(); | |
locked.push_back( | |
slot, | |
); | |
Handle { | |
task_id, | |
__phantom: PhantomData, | |
} | |
} | |
} | |
// Add in a join handle. | |
// We can have a single-threaded executor that just gives you access to the single executor. | |
struct Handle<T> { | |
// keep a pointer to the executor which spawned everything here instead. | |
task_id: TaskId, | |
__phantom: PhantomData<T>, | |
} | |
impl<T> Handle<T> { | |
pub fn join(self) -> T { | |
println!("not implemented yet"); | |
todo!() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Execution result: