Last active
January 26, 2023 23:14
-
-
Save gamemachine/de6aa7b236ee0eb17c6748930a85f07f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub struct Broadcast<T: Clone> { | |
senders: ArrayQueue<Vec<crossbeam::channel::Sender<T>>>, | |
pending_senders: ArrayQueue<crossbeam::channel::Sender<T>>, | |
pending_signal: Arc<AtomicBool> | |
} | |
impl<T: Clone> Broadcast<T> { | |
pub fn new() -> Arc<Self> { | |
let senders: ArrayQueue<Vec<crossbeam::channel::Sender<T>>> = ArrayQueue::new(1); | |
senders.push(Vec::new()).unwrap(); | |
let topic = Broadcast { | |
senders, | |
pending_senders: ArrayQueue::new(100), | |
pending_signal: Arc::new(AtomicBool::new(false)) | |
}; | |
Arc::new(topic) | |
} | |
pub fn send(&self, value: T) { | |
let mut senders = self.senders.pop().unwrap(); | |
if self.pending_signal.load(Ordering::Relaxed) { | |
self.pending_signal.store(false, Ordering::Relaxed); | |
while let Some(pending) = self.pending_senders.pop() { | |
senders.push(pending); | |
} | |
} | |
let mut disconnected_index = usize::MAX; | |
for (index, sender) in senders.iter().enumerate() { | |
match sender.try_send(value.clone()) { | |
Err(err) => { | |
match err { | |
TrySendError::Disconnected(_) => { | |
disconnected_index = index; | |
}, | |
_ => {} | |
} | |
tracing::warn!("Topic try_send {:?}", err); | |
}, | |
_ => {} | |
} | |
} | |
if disconnected_index != usize::MAX { | |
senders.swap_remove(disconnected_index); | |
} | |
self.senders.push(senders).unwrap(); | |
} | |
pub fn subscribe(&self, capacity: usize) -> Option<Arc<BroadcastReceiver<T>>> { | |
let (tx, rx):(crossbeam::channel::Sender<T>, crossbeam::channel::Receiver<T>) = crossbeam::channel::bounded(capacity); | |
if self.pending_senders.push(tx).is_ok() { | |
self.pending_signal.store(true, Ordering::Relaxed); | |
Some(BroadcastReceiver::new(rx)) | |
} else { | |
None | |
} | |
} | |
} | |
pub struct BroadcastReceiver<T: Clone> { | |
rx: crossbeam::channel::Receiver<T> | |
} | |
impl<T: Clone> BroadcastReceiver<T> { | |
pub fn new(rx: crossbeam::channel::Receiver<T>) -> Arc<Self> { | |
let sub = BroadcastReceiver { | |
rx, | |
}; | |
Arc::new(sub) | |
} | |
pub fn try_recv(&self) -> Result<T, TryRecvError> { | |
self.rx.try_recv() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment