Skip to content

Instantly share code, notes, and snippets.

@zesterer
Created March 25, 2020 10:23
Show Gist options
  • Save zesterer/59b2b1949430cc74ec26827b24ba6f5b to your computer and use it in GitHub Desktop.
Save zesterer/59b2b1949430cc74ec26827b24ba6f5b to your computer and use it in GitHub Desktop.
diff --git a/src/lib.rs b/src/lib.rs
index 24534a8..8121dd9 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -170,6 +170,11 @@ fn wait_lock<'a, T>(lock: &'a InnerMutex<T>) -> MutexGuard<'a, T> {
}
}
+#[inline]
+fn poll_lock<'a, T>(lock: &'a InnerMutex<T>) -> Option<MutexGuard<'a, T>> {
+ lock.try_lock()
+}
+
/// Wrapper around a queue. This wrapper exists to permit a maximum length.
struct Queue<T>(VecDeque<T>, Option<usize>);
@@ -621,7 +626,25 @@ impl<T> Sender<T> {
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
match &self.shared.chan {
Channel::Bounded { cap, pending, queue } => {
- let mut queue = wait_lock(&queue);
+ let mut queue = {
+ let mut i = 0;
+ loop {
+ if i > *cap {
+ break wait_lock(&queue);
+ }
+
+ if let Some(queue) = poll_lock(&queue) {
+ if queue.len() < *cap {
+ break queue;
+ }
+ }
+
+ i += 1;
+
+ drop(queue);
+ thread::yield_now();
+ }
+ };
if self.shared.disconnected.load(Ordering::Relaxed) {
Err(SendError(msg))
} else if queue.len() < *cap {
@@ -633,10 +656,12 @@ impl<T> Sender<T> {
*unblock_signal.lock() = Some(msg);
wait_lock(&pending).push_back(unblock_signal.clone());
+
self.shared.send_signal.notify_one(());
- unblock_signal.wait_while(queue, |msg| msg.is_some() &&
- !self.shared.disconnected.load(Ordering::Relaxed));
+ unblock_signal.wait_while(queue, |msg| {
+ msg.is_some() && !self.shared.disconnected.load(Ordering::Relaxed)
+ });
if let Some(msg) = unblock_signal.lock().take() {
Err(SendError(msg))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment