Created
May 13, 2023 10:26
-
-
Save lithdew/4a6a47ca22eb4468fb8adb4453d87aa4 to your computer and use it in GitHub Desktop.
rust: single-threaded futures-aware one-shot broadcast channel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub mod oneshot_broadcast { | |
use std::{ | |
cell::UnsafeCell, | |
pin::Pin, | |
task::{Context, Poll, Waker}, | |
}; | |
use futures_lite::Future; | |
use pin_list::PinList; | |
use pin_project_lite::pin_project; | |
type PinListTypes = dyn pin_list::Types< | |
Id = pin_list::id::Checked, | |
Protected = Waker, | |
Removed = (), | |
Unprotected = (), | |
>; | |
pub struct Channel<T: Clone> { | |
data: UnsafeCell<Option<T>>, | |
waiters: UnsafeCell<PinList<PinListTypes>>, | |
} | |
impl<T: Clone> Channel<T> { | |
pub fn new() -> Self { | |
Self { | |
data: UnsafeCell::new(None), | |
waiters: UnsafeCell::new(PinList::new(pin_list::id::Checked::new())), | |
} | |
} | |
pub fn send(&self, data: T) { | |
unsafe { *self.data.get() = Some(data) }; | |
let mut cursor = unsafe { (*self.waiters.get()).cursor_front_mut() }; | |
while let Ok(waker) = cursor.remove_current(()) { | |
waker.wake(); | |
} | |
} | |
pub fn recv(&self) -> Recv<'_, T> { | |
Recv { | |
chan: self, | |
node: pin_list::Node::new(), | |
} | |
} | |
} | |
pin_project! { | |
pub struct Recv<'chan, T: Clone> { | |
chan: &'chan Channel<T>, | |
#[pin] | |
node: pin_list::Node<PinListTypes>, | |
} | |
} | |
impl<'chan, T: Clone> Future for Recv<'chan, T> { | |
type Output = T; | |
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
let mut this = self.project(); | |
let waiters = unsafe { &mut *this.chan.waiters.get() }; | |
let data = unsafe { &*this.chan.data.get() }; | |
if let Some(node) = this.node.as_mut().initialized_mut() { | |
if let Err(node) = node.take_removed(waiters) { | |
*node.protected_mut(waiters).unwrap() = cx.waker().clone(); | |
return Poll::Pending; | |
} | |
} | |
if let Some(data) = data { | |
return Poll::Ready(data.clone()); | |
} | |
waiters.push_back(this.node, cx.waker().clone(), ()); | |
Poll::Pending | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment