Last active
December 18, 2019 06:31
-
-
Save tawashichan/d4a43268c647e46208b2ebf153a60b0c to your computer and use it in GitHub Desktop.
future_sample
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// 参考 | |
// https://rust-lang.github.io/async-book/02_execution/04_executor.html | |
// https://keens.github.io/blog/2019/07/07/rustnofuturetosonorunnerwotsukuttemita/ | |
use std::{ | |
future::{Future}, | |
pin::Pin, | |
sync::mpsc::{sync_channel, Receiver, SyncSender}, | |
sync::{Arc, Mutex}, | |
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, | |
thread, | |
time::Duration, | |
}; | |
struct TimerFuture { | |
shared_state: Arc<Mutex<SharedState>>, | |
} | |
struct SharedState { | |
completed: bool, | |
waker: Option<Waker>, | |
} | |
impl Future for TimerFuture { | |
type Output = i32; | |
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
let mut shared_state = self.shared_state.lock().unwrap(); | |
if shared_state.completed { | |
println!("ready"); | |
Poll::Ready(1000) | |
} else { | |
println!("pending..."); | |
shared_state.waker = Some(cx.waker().clone()); | |
Poll::Pending | |
} | |
} | |
} | |
impl TimerFuture { | |
fn new(duration: Duration) -> Self { | |
let shared_state = Arc::new(Mutex::new(SharedState { | |
completed: false, | |
waker: None, | |
})); | |
let thread_shared_state = shared_state.clone(); | |
thread::spawn(move || { | |
thread::sleep(duration); | |
let mut shared_state = thread_shared_state.lock().unwrap(); | |
shared_state.completed = true; | |
if let Some(waker) = shared_state.waker.take() { | |
waker.wake() | |
} | |
}); | |
TimerFuture { shared_state } | |
} | |
} | |
struct Executor { | |
ready_queue: Receiver<Arc<Task>>, | |
} | |
#[derive(Clone)] | |
struct Spawner { | |
task_sender: SyncSender<Arc<Task>>, | |
} | |
struct Task { | |
future: Mutex<Option<Pin<Box<dyn Future<Output = ()>>>>>, | |
task_sender: SyncSender<Arc<Task>>, | |
} | |
fn new_executor_and_spanner() -> (Executor, Spawner) { | |
const MAX_QUEUED_TASKS: usize = 10_000; | |
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); | |
(Executor { ready_queue }, Spawner { task_sender }) | |
} | |
impl Spawner { | |
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { | |
let future = Box::pin(future); | |
let task = Arc::new(Task { | |
future: Mutex::new(Some(future)), | |
task_sender: self.task_sender.clone(), | |
}); | |
self.task_sender.send(task).expect("too many tasks queued"); | |
} | |
} | |
static CUSTOM_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( | |
CustomWaker::unsafe_clone, | |
CustomWaker::unsafe_wake, | |
CustomWaker::unsafe_wake_by_ref, | |
CustomWaker::unsafe_drop, | |
); | |
#[derive(Clone)] | |
struct CustomWaker{ | |
arc_task: Arc<Task> | |
} | |
impl CustomWaker { | |
fn waker(arc_task: Arc<Task>) -> Waker { | |
unsafe { Waker::from_raw(Self::new(arc_task).into_raw_waker()) } | |
} | |
fn new(arc_task: Arc<Task>) -> Self { | |
Self{ | |
arc_task, | |
} | |
} | |
unsafe fn into_raw_waker(self) -> RawWaker { | |
let prt = Box::into_raw(Box::new(self)) as *const (); | |
RawWaker::new(prt, &CUSTOM_WAKER_VTABLE) | |
} | |
unsafe fn unsafe_clone(this: *const ()) -> RawWaker { | |
let ptr = this as *const Self; | |
Box::new(ptr.as_ref().unwrap().clone()).into_raw_waker() | |
} | |
fn wake(self: Self) { | |
let cloned = self.arc_task.clone(); | |
self.arc_task.task_sender.send(cloned).expect("too many tasks queued"); | |
} | |
unsafe fn unsafe_wake(this: *const ()) { | |
let ptr = this as *mut Self; | |
Box::from_raw(ptr).wake() | |
} | |
fn wake_by_ref(&self) { | |
Box::new(self.clone()).wake() | |
} | |
unsafe fn unsafe_wake_by_ref(this: *const ()) { | |
let ptr = this as *const Self; | |
ptr.as_ref().unwrap().wake_by_ref() | |
} | |
unsafe fn unsafe_drop(this: *const ()) { | |
let ptr = this as *mut Self; | |
Box::from_raw(ptr); | |
} | |
} | |
impl Executor { | |
fn run(&self) { | |
while let Ok(task) = self.ready_queue.recv() { | |
let mut future_slot = task.future.lock().unwrap(); | |
if let Some(mut future) = future_slot.take() { | |
let waker = CustomWaker::waker(task.clone()); | |
let mut context = Context::from_waker(&waker); | |
if let Poll::Pending = future.as_mut().poll(&mut context) { | |
*future_slot = Some(future) | |
} | |
} | |
} | |
} | |
} | |
fn main() { | |
let (executor, spawner) = new_executor_and_spanner(); | |
spawner.spawn(async { | |
println!("start"); | |
TimerFuture::new(Duration::from_millis(2000)).await; | |
println!("finished: {:?}",2000); | |
}); | |
spawner.spawn(async { | |
TimerFuture::new(Duration::from_millis(1000)).await; | |
println!("finished: {:?}",1000); | |
}); | |
drop(spawner); | |
executor.run(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment