Last active
August 20, 2018 18:34
-
-
Save tbelaire/cbe73a2f21757e73438dc9e169d0e921 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(async_await, await_macro, futures_api, pin, arbitrary_self_types)] | |
use std::future::{Future, FutureObj}; | |
use std::mem::PinMut; | |
use std::sync::{Arc, Mutex}; | |
use std::sync::mpsc::{sync_channel, SyncSender, SendError, Receiver}; | |
use std::task::{ | |
self, | |
local_waker_from_nonlocal, | |
Poll, | |
Spawn, | |
SpawnErrorKind, | |
SpawnObjError, | |
Wake, | |
}; | |
mod secret; | |
use self::secret::almost_ready; | |
/// Task executor that receives tasks off of a channel and runs them. | |
struct Executor { | |
task_receiver: Receiver<Arc<Task>>, | |
} | |
impl Executor { | |
fn run(&self) { | |
while true { | |
println!("executor"); | |
let arc_task: Arc<Task> = self.task_receiver.recv().unwrap(); | |
let local_waker = local_waker_from_nonlocal(arc_task.clone()); | |
let spawner: &mut dyn Spawn = &mut &arc_task.spawner; | |
let mut cx: task::Context = task::Context::new( | |
&local_waker, spawner); | |
let mut future = arc_task.future.lock().unwrap(); | |
let result = PinMut::new((&mut future).as_mut().unwrap()).poll(&mut cx); | |
match result { | |
Poll::Pending => println!("Future pending!"), | |
Poll::Ready(_) => println!("Ready future!"), | |
} | |
} | |
// FIXME: implement the running of the executor. | |
// | |
// This method should pull tasks off of the existing task | |
// queue and run them to completion. | |
// | |
// In order to poll futures, you'll need to construct a | |
// `task::Context` from a `LocalWaker` and a `Spawn`. | |
// You can get a value of type `LocalWaker` by calling | |
// `local_waker_from_nonlocal` on an `Arc<W>` where `W: Wake`. | |
// | |
// To poll the future you'll need to do: | |
// `PinMut::new(future).poll(cx)` where cx is `&mut Context` | |
} | |
} | |
/// Task executor that spawns tasks onto a channel. | |
#[derive(Clone)] | |
struct Spawner { | |
task_sender: SyncSender<Arc<Task>>, | |
} | |
impl Spawner { | |
// Spawn a future as a new top-level task. | |
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { | |
println!("spawn"); | |
let future_obj = FutureObj::new(Box::new(future)); | |
(&mut &*self).spawn_obj(future_obj) | |
.expect("unable to spawn"); | |
} | |
} | |
// Implement the `Spawn` trait for `&Spawner` rather than `Spawner` since | |
// we don't require a mutable reference to `Spawner`. | |
impl<'a> Spawn for &'a Spawner { | |
fn spawn_obj(&mut self, future: FutureObj<'static, ()>) | |
-> Result<(), SpawnObjError> | |
{ | |
println!("spawn_obj"); | |
let task = Arc::new(Task { | |
future: Mutex::new(Some(future)), | |
spawner: self.clone(), | |
}); | |
self.task_sender.send(task).map_err(|SendError(task)| { | |
SpawnObjError { | |
kind: SpawnErrorKind::shutdown(), | |
future: task.future.lock().unwrap().take().unwrap(), | |
} | |
}) | |
} | |
} | |
struct Task { | |
// In-progress future that should be pushed to completion | |
future: Mutex<Option<FutureObj<'static, ()>>>, | |
// Handle to spawn tasks onto the task queue | |
spawner: Spawner, | |
} | |
impl Wake for Task { | |
fn wake(arc_self: &Arc<Self>) { | |
let mut future = arc_self.future.lock().unwrap(); | |
(& arc_self.spawner).spawn_obj(future); | |
// FIXME: implement `Wake` by putting the task back onto the task queue | |
} | |
} | |
fn new_executor_and_spawner() -> (Executor, Spawner) { | |
// Maximum number of tasks to allow queueing in the channel at once. | |
// This is just to make `sync_channel` happy, and wouldn't be present in | |
// a real executor. | |
const MAX_QUEUED_TASKS: usize = 10000; | |
let (task_sender, task_receiver) = sync_channel(MAX_QUEUED_TASKS); | |
(Executor { task_receiver }, Spawner { task_sender }) | |
} | |
fn main() { | |
let (executor, spawner) = new_executor_and_spawner(); | |
spawner.spawn(async { | |
println!("howdy!"); | |
let x = await!(almost_ready(5)); | |
println!("done: {:?}", x); | |
}); | |
// spawner.spawn(async { | |
// println!("starting timer"); | |
// }); | |
executor.run(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment