Created
August 21, 2020 00:52
-
-
Save djg/2050d09d406084c088b3a586aad2f94d to your computer and use it in GitHub Desktop.
An Abomination Unto Nuggan
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* This Source Code Form is subject to the terms of the Mozilla Public | |
* License, v. 2.0. If a copy of the MPL was not distributed with this | |
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */ | |
#![allow(non_snake_case)] | |
use std::{ | |
cell::UnsafeCell, | |
sync::atomic::{AtomicUsize, Ordering::SeqCst}, | |
}; | |
use { | |
cstr::*, | |
futures::{ | |
future::FutureObj, | |
task::SpawnExt, | |
task::{Spawn, SpawnError}, | |
}, | |
moz_task, | |
nserror::{nsresult, NS_ERROR_UNEXPECTED, NS_OK}, | |
std::{ | |
ffi::CStr, | |
future::Future, | |
mem, | |
pin::Pin, | |
sync::{Arc, Condvar, Mutex}, | |
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, | |
}, | |
xpcom::{ | |
interfaces::{nsIEventTarget, nsIRunnable, nsISerialEventTarget}, | |
xpcom, xpcom_method, RefPtr, | |
}, | |
}; | |
impl<T: ?Sized> FutureExt for T where T: Future {} | |
trait FutureExt: Future { | |
fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output> | |
where | |
Self: Unpin, | |
{ | |
Pin::new(self).poll(cx) | |
} | |
} | |
/// Demo `Future` to demonstrate executing futures to completion via `Task`. | |
struct MyFuture { | |
poll_count: u32, | |
ready: bool, | |
waker: Option<Waker>, | |
} | |
impl Default for MyFuture { | |
fn default() -> Self { | |
Self { | |
poll_count: 0, | |
ready: false, | |
waker: None, | |
} | |
} | |
} | |
impl Future for MyFuture { | |
type Output = (); | |
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { | |
self.poll_count += 1; | |
if let Some(waker) = &mut self.waker { | |
if !waker.will_wake(cx.waker()) { | |
*waker = cx.waker().clone(); | |
} | |
} else { | |
let waker = cx.waker().clone(); | |
self.waker = Some(waker); | |
} | |
println!("Poll count = {}", self.poll_count); | |
if self.ready { | |
Poll::Ready(()) | |
} else { | |
self.ready = true; | |
// Just notify the task that we need to re-polled. | |
if let Some(waker) = &self.waker { | |
waker.wake_by_ref(); | |
} | |
Poll::Pending | |
} | |
} | |
} | |
// helper for dispatching an `nsIRunnable` to `nsIEventTarget`, converting the | |
// result into `Result<(), SpawnError>`. | |
fn dispatch_with_options( | |
event_target: &nsIEventTarget, | |
runnable: &nsIRunnable, | |
options: u32, | |
) -> Result<(), SpawnError> { | |
// DispatchFromScript does an AddRef on runnable for us. | |
match unsafe { event_target.DispatchFromScript(runnable, options) } { | |
NS_OK => Ok(()), | |
// From nsIEventTarget.idl: | |
// NS_ERROR_UNEXPECTED - Indicates that the thread is shutting down and | |
// has finished processing events, so this event would never run and | |
// has not been dispatched. | |
NS_ERROR_UNEXPECTED => Err(SpawnError::shutdown()), | |
_ => unreachable!(), | |
} | |
} | |
// helper for dispatching an `nsIRunnable` to `nsIEventTarget`, converting the | |
// result into `Result<(), SpawnError>`. | |
fn dispatch(event_target: &nsIEventTarget, runnable: &nsIRunnable) -> Result<(), SpawnError> { | |
dispatch_with_options( | |
event_target, | |
runnable, | |
nsIEventTarget::DISPATCH_NORMAL as u32, | |
) | |
} | |
// Define a type which implements nsIRunnable in rust. | |
#[derive(xpcom)] | |
#[xpimplements(nsIRunnable, nsISupports)] | |
#[refcnt = "atomic"] | |
struct InitTask { | |
future: UnsafeCell<FutureObj<'static, ()>>, | |
event_target: RefPtr<nsIEventTarget>, | |
state: AtomicUsize, | |
} | |
impl Task { | |
pub fn new(future: FutureObj<'static, ()>, event_target: &nsIEventTarget) -> RefPtr<Task> { | |
Task::allocate(InitTask { | |
future: UnsafeCell::new(future), | |
event_target: RefPtr::new(event_target), | |
state: AtomicUsize::new(IDLE), | |
}) | |
} | |
/// Runs a closure from the context of the task. | |
/// | |
/// Any wake notifications resulting from the execution of the closure are | |
/// tracked. | |
fn enter<F, R>(&self, f: F) -> R | |
where | |
F: FnOnce(&mut Context<'_>) -> R, | |
{ | |
let waker = self.waker(); | |
let mut cx = Context::from_waker(&waker); | |
f(&mut cx) | |
} | |
/// Obtain a `Waker` that can be used to wake the `Task`. | |
fn waker(&self) -> Waker { | |
unsafe { | |
let raw = to_raw(RefPtr::new(self)); | |
Waker::from_raw(raw) | |
} | |
} | |
} | |
// nsIRunnable implementation | |
impl Task { | |
xpcom_method!(run => Run()); | |
fn run(&self) -> Result<(), nsresult> { | |
self.enter(|cx| { | |
// Safety: The ownership of this `Task` object is evidence that | |
// we are in the `POLL`/`REPOLL` state. | |
unsafe { | |
self.start_poll(); | |
loop { | |
let fut = &mut (*self.future.get()); | |
let res = fut.poll_unpin(cx); | |
match res { | |
Poll::Pending => {} | |
Poll::Ready(()) => return self.complete(), | |
} | |
if !self.wait() { | |
break; // we've waited | |
} | |
} | |
} | |
}); | |
Ok(()) | |
} | |
} | |
// Task State Machine - This was heavily cribbed from futures-executor::ThreadPool | |
// There are four possible task states, listed below with their possible | |
// transitions: | |
// The task is blocked, waiting on an event | |
const IDLE: usize = 0; // --> POLL | |
// The task is actively being polled by a thread; arrival of additional events | |
// of interest should move it to the REPOLL state | |
const POLL: usize = 1; // --> IDLE, REPOLL, or COMPLETE | |
// The task is actively being polled, but will need to be re-polled upon | |
// completion to ensure that all events were observed. | |
const REPOLL: usize = 2; // --> POLL | |
// The task has finished executing (either successfully or with an error/panic) | |
const COMPLETE: usize = 3; // No transitions out | |
impl Task { | |
/// Attempt to "wake up" the task and poll the future. | |
/// | |
/// A `true` result indicates that the `POLL` state has been entered, and | |
/// the caller can proceed to poll the future. An `false` result indicates | |
/// that polling is not necessary (because the task is finished or the | |
/// polling has been delegated). | |
pub(crate) fn wake_up(&self) -> bool { | |
let mut state = self.state.load(SeqCst); | |
loop { | |
match state { | |
// The task is idle, so try to run it immediately. | |
IDLE => match self.state.compare_exchange(IDLE, POLL, SeqCst, SeqCst) { | |
Ok(_) => { | |
return true; | |
} | |
Err(cur) => state = cur, | |
}, | |
// The task is being polled, so we need to record that it should | |
// be *repolled* when complete. | |
POLL => match self.state.compare_exchange(POLL, REPOLL, SeqCst, SeqCst) { | |
Ok(_) => return false, | |
Err(cur) => state = cur, | |
}, | |
// The task is already scheduled for polling, or is complete, so | |
// we've got nothing to do. | |
_ => return false, | |
} | |
} | |
} | |
/// Alert the Task that polling is about to begin, clearing any accumulated | |
/// re-poll requests. | |
/// | |
/// # Safety | |
/// | |
/// Callable only from the `POLL`/`REPOLL` states, i.e. between | |
/// successful calls to `wakeup` and `wait`/`complete`. | |
pub(crate) unsafe fn start_poll(&self) { | |
self.state.store(POLL, SeqCst); | |
} | |
/// Alert the Task that polling completed with `Pending`. | |
/// | |
/// Returns true if a `REPOLL` is pending. | |
/// | |
/// # Safety | |
/// | |
/// Callable only from the `POLL`/`REPOLL` states, i.e. between | |
/// successful calls to `notify` and `wait`/`complete`. | |
pub(crate) unsafe fn wait(&self) -> bool { | |
match self.state.compare_exchange(POLL, IDLE, SeqCst, SeqCst) { | |
// no wakeups came in while we were running | |
Ok(_) => false, | |
// guaranteed to be in REPOLL state; just clobber the | |
// state and run again. | |
Err(state) => { | |
assert_eq!(state, REPOLL); | |
self.state.store(POLL, SeqCst); | |
true | |
} | |
} | |
} | |
/// Alert the mutex that the task has completed execution and should not be | |
/// notified again. | |
/// | |
/// # Safety | |
/// | |
/// Callable only from the `POLL`/`REPOLL` states, i.e. between | |
/// successful calls to `wakeup` and `wait`/`complete`. | |
pub(crate) unsafe fn complete(&self) { | |
self.state.store(COMPLETE, SeqCst); | |
} | |
} | |
// Waker interface - Implementation for RawWaker so Task nsIRunnable can be | |
// used to wake itself. | |
impl Task { | |
fn wake(&self) { | |
if self.wake_up() { | |
// TODO: how should we handle error? | |
dispatch(self.event_target.coerce(), self.coerce()).unwrap() | |
} | |
} | |
} | |
static TASK_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( | |
task_waker_clone, | |
task_waker_wake, | |
task_waker_wake_by_ref, | |
task_waker_drop, | |
); | |
unsafe fn to_raw(waker: RefPtr<Task>) -> RawWaker { | |
let mut raw = std::ptr::null(); | |
waker.forget(&mut raw); | |
RawWaker::new(raw as *const (), &TASK_WAKER_VTABLE) | |
} | |
unsafe fn from_raw(raw: *const ()) -> RefPtr<Task> { | |
RefPtr::from_raw_dont_addref(raw as *const Task).expect("Received null ptr") | |
} | |
unsafe fn from_raw_clone(raw: *const ()) -> RefPtr<Task> { | |
RefPtr::from_raw(raw as *const Task).expect("Received null ptr") | |
} | |
unsafe fn task_waker_clone(raw: *const ()) -> RawWaker { | |
to_raw(from_raw_clone(raw)) | |
} | |
unsafe fn task_waker_wake(raw: *const ()) { | |
let task = from_raw(raw); | |
task.wake(); | |
} | |
unsafe fn task_waker_wake_by_ref(raw: *const ()) { | |
let task = from_raw(raw); | |
task.wake(); | |
// We don't actually own a reference to the waker | |
mem::forget(task); | |
} | |
unsafe fn task_waker_drop(raw: *const ()) { | |
let _ = from_raw(raw); | |
} | |
/// A general purpose interface to for scheduling tasks that poll futures to | |
/// completion on a BackgroundTaskQueue. | |
/// | |
/// ``` | |
/// let tq = BackgroundTaskQueue::new(cstr!("TQ")).unwrap(); | |
/// let future = async { /* ... */ }; | |
/// task_queue.spawn(future); | |
/// ``` | |
struct BackgroundTaskQueue { | |
task_queue: RefPtr<nsISerialEventTarget>, | |
} | |
impl BackgroundTaskQueue { | |
pub fn new(name: &'static CStr) -> Result<Self, nsresult> { | |
let task_queue = moz_task::create_background_task_queue(name)?; | |
Ok(BackgroundTaskQueue { task_queue }) | |
} | |
/// Dispatches a task to the task queue that will be run the future to completion. | |
pub fn dispatch_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { | |
let task = Task::new(future, self.task_queue.coerce()); | |
dispatch(self.task_queue.coerce(), task.coerce()) | |
} | |
} | |
impl Spawn for BackgroundTaskQueue { | |
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { | |
self.dispatch_obj(future) | |
} | |
} | |
#[derive(Clone)] | |
struct WaitBool { | |
inner: Arc<(Mutex<bool>, Condvar)>, | |
} | |
impl Default for WaitBool { | |
fn default() -> Self { | |
Self { | |
inner: Arc::new((Mutex::new(false), Condvar::new())), | |
} | |
} | |
} | |
impl WaitBool { | |
pub fn notify(&self) { | |
let (lock, cvar) = &*self.inner; | |
let mut signal = lock.lock().unwrap(); | |
*signal = true; | |
// We notify the condvar that the value has changed. | |
cvar.notify_one(); | |
} | |
pub fn wait(&self) { | |
let (lock, cvar) = &*self.inner; | |
let mut signal = lock.lock().unwrap(); | |
while !*signal { | |
signal = cvar.wait(signal).unwrap(); | |
} | |
} | |
} | |
#[no_mangle] | |
pub unsafe extern "C" fn Rust_Future(it_worked: *mut bool) { | |
let wb1 = WaitBool::default(); | |
let wb2 = wb1.clone(); | |
let task_queue = | |
BackgroundTaskQueue::new(cstr!("Rust_Future")).expect("Failed to create task queue"); | |
if let Ok(..) = task_queue.spawn(async move { | |
MyFuture::default().await; | |
wb2.notify() | |
}) { | |
// Wait for the future to complete | |
wb1.wait(); | |
*it_worked = true; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment