Skip to content

Instantly share code, notes, and snippets.

@gamemachine
Last active January 26, 2023 23:14
Show Gist options
  • Save gamemachine/de6aa7b236ee0eb17c6748930a85f07f to your computer and use it in GitHub Desktop.
Save gamemachine/de6aa7b236ee0eb17c6748930a85f07f to your computer and use it in GitHub Desktop.
pub struct Broadcast<T: Clone> {
senders: ArrayQueue<Vec<crossbeam::channel::Sender<T>>>,
pending_senders: ArrayQueue<crossbeam::channel::Sender<T>>,
pending_signal: Arc<AtomicBool>
}
impl<T: Clone> Broadcast<T> {
pub fn new() -> Arc<Self> {
let senders: ArrayQueue<Vec<crossbeam::channel::Sender<T>>> = ArrayQueue::new(1);
senders.push(Vec::new()).unwrap();
let topic = Broadcast {
senders,
pending_senders: ArrayQueue::new(100),
pending_signal: Arc::new(AtomicBool::new(false))
};
Arc::new(topic)
}
pub fn send(&self, value: T) {
let mut senders = self.senders.pop().unwrap();
if self.pending_signal.load(Ordering::Relaxed) {
self.pending_signal.store(false, Ordering::Relaxed);
while let Some(pending) = self.pending_senders.pop() {
senders.push(pending);
}
}
let mut disconnected_index = usize::MAX;
for (index, sender) in senders.iter().enumerate() {
match sender.try_send(value.clone()) {
Err(err) => {
match err {
TrySendError::Disconnected(_) => {
disconnected_index = index;
},
_ => {}
}
tracing::warn!("Topic try_send {:?}", err);
},
_ => {}
}
}
if disconnected_index != usize::MAX {
senders.swap_remove(disconnected_index);
}
self.senders.push(senders).unwrap();
}
pub fn subscribe(&self, capacity: usize) -> Option<Arc<BroadcastReceiver<T>>> {
let (tx, rx):(crossbeam::channel::Sender<T>, crossbeam::channel::Receiver<T>) = crossbeam::channel::bounded(capacity);
if self.pending_senders.push(tx).is_ok() {
self.pending_signal.store(true, Ordering::Relaxed);
Some(BroadcastReceiver::new(rx))
} else {
None
}
}
}
pub struct BroadcastReceiver<T: Clone> {
rx: crossbeam::channel::Receiver<T>
}
impl<T: Clone> BroadcastReceiver<T> {
pub fn new(rx: crossbeam::channel::Receiver<T>) -> Arc<Self> {
let sub = BroadcastReceiver {
rx,
};
Arc::new(sub)
}
pub fn try_recv(&self) -> Result<T, TryRecvError> {
self.rx.try_recv()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment